You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/14 11:04:39 UTC
[2/2] carbondata git commit: [CARBONDATA-1693][Streaming] Change
SegmentStatus from String to enum
[CARBONDATA-1693][Streaming] Change SegmentStatus from String to enum
This closes #1480
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f0959a8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f0959a8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f0959a8
Branch: refs/heads/master
Commit: 2f0959a824b728e8f6193753767e1f58dee4017d
Parents: 1a62189
Author: Jacky Li <ja...@qq.com>
Authored: Sun Nov 12 22:50:15 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Nov 14 19:02:05 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 52 ---------
.../core/datastore/row/LoadStatusType.java | 37 -------
.../core/mutate/CarbonUpdateUtil.java | 25 ++---
.../core/mutate/SegmentUpdateDetails.java | 11 +-
.../reader/CarbonDeleteDeltaFileReaderImpl.java | 4 +-
.../core/statusmanager/LoadMetadataDetails.java | 15 ++-
.../core/statusmanager/SegmentStatus.java | 108 +++++++++++++++++++
.../statusmanager/SegmentStatusManager.java | 86 +++++++--------
.../SegmentUpdateStatusManager.java | 25 +++--
.../writer/CarbonDeleteDeltaWriterImpl.java | 4 +-
.../examples/SparkSessionExample.scala | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 8 +-
.../hadoop/util/BlockLevelTraverser.java | 2 +-
.../hadoop/test/util/StoreCreator.java | 3 +-
.../presto/util/CarbonDataStoreCreator.scala | 6 +-
.../MajorCompactionIgnoreInMinorTest.scala | 11 +-
.../org/apache/carbondata/api/CarbonStore.scala | 20 ++--
.../org/apache/carbondata/spark/KeyVal.scala | 10 +-
.../load/DataLoadProcessBuilderOnSpark.scala | 6 +-
.../spark/rdd/AlterTableAddColumnRDD.scala | 11 +-
.../spark/rdd/AlterTableDropColumnRDD.scala | 11 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 13 +--
.../spark/rdd/DataManagementFunc.scala | 18 +++-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 29 +++--
.../carbondata/spark/rdd/SparkDataMapJob.scala | 4 -
.../carbondata/spark/rdd/UpdateDataLoad.scala | 4 +-
.../carbondata/spark/util/CommonUtil.scala | 18 ++--
.../spark/util/GlobalDictionaryUtil.scala | 5 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 69 ++++++------
.../command/management/LoadTableCommand.scala | 7 +-
.../command/mutation/DeleteExecution.scala | 28 ++---
.../org/apache/spark/util/ShowSegments.scala | 88 ---------------
.../segmentreading/TestSegmentReading.scala | 4 +-
.../TestStreamingTableOperation.scala | 3 +-
.../apache/spark/util/CarbonCommandSuite.scala | 7 +-
.../processing/merger/CarbonDataMergerUtil.java | 42 ++++----
.../processing/util/CarbonLoaderUtil.java | 21 ++--
.../processing/util/DeleteLoadFolders.java | 6 +-
.../processing/util/LoadMetadataUtil.java | 47 --------
.../carbondata/processing/StoreCreator.java | 4 +-
.../streaming/segment/StreamSegment.java | 9 +-
41 files changed, 376 insertions(+), 508 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8018c2b..5c04e8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -512,15 +512,6 @@ public final class CarbonCommonConstants {
*/
public static final String ENABLE_DATA_LOADING_STATISTICS_DEFAULT = "false";
/**
- * IS_INT_BASED_INDEXER
- */
- @CarbonProperty
- public static final String HIGH_CARDINALITY_VALUE = "high.cardinality.value";
- /**
- * IS_INT_BASED_INDEXER_DEFAULTVALUE
- */
- public static final String HIGH_CARDINALITY_VALUE_DEFAULTVALUE = "100000";
- /**
* CONSTANT_SIZE_TEN
*/
public static final int CONSTANT_SIZE_TEN = 10;
@@ -528,41 +519,8 @@ public final class CarbonCommonConstants {
* LEVEL_METADATA_FILE
*/
public static final String LEVEL_METADATA_FILE = "levelmetadata_";
- /**
- * LOAD_STATUS SUCCESS
- */
- public static final String STORE_LOADSTATUS_SUCCESS = "Success";
- /**
- * LOAD_STATUS UPDATE
- */
- public static final String STORE_LOADSTATUS_UPDATE = "Update";
- /**
- * LOAD_STATUS FAILURE
- */
- public static final String STORE_LOADSTATUS_FAILURE = "Failure";
- /**
- * LOAD_STATUS PARTIAL_SUCCESS
- */
- public static final String STORE_LOADSTATUS_PARTIAL_SUCCESS = "Partial Success";
/**
- * STORE_LOADSTATUS_STREAMING
- */
- public static final String STORE_LOADSTATUS_STREAMING = "Streaming";
-
- /**
- * STORE_LOADSTATUS_STREAMING
- */
- public static final String STORE_LOADSTATUS_STREAMING_FINISH = "Streaming Finish";
- /**
- * LOAD_STATUS
- */
- public static final String CARBON_METADATA_EXTENSION = ".metadata";
- /**
- * LOAD_STATUS
- */
- public static final String CARBON_DEFAULT_STREAM_ENCODEFORMAT = "UTF-8";
- /**
* COMMA
*/
public static final String COMMA = ",";
@@ -632,11 +590,6 @@ public final class CarbonCommonConstants {
*/
public static final String DEFAULT_COMPRESSOR = "snappy";
- /**
- * MARKED_FOR_DELETION
- */
- public static final String MARKED_FOR_DELETE = "Marked for Delete";
- public static final String MARKED_FOR_UPDATE = "Marked for Update";
public static final String STRING_TYPE = "StringType";
public static final String INTEGER_TYPE = "IntegerType";
public static final String LONG_TYPE = "LongType";
@@ -915,11 +868,6 @@ public final class CarbonCommonConstants {
public static final long CARBON_256MB = 256 * 1024 * 1024;
/**
- * COMPACTED is property to indicate whether seg is compacted or not.
- */
- public static final String COMPACTED = "Compacted";
-
- /**
* ZOOKEEPERLOCK TYPE
*/
public static final String CARBON_LOCK_TYPE_ZOOKEEPER = "ZOOKEEPERLOCK";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
deleted file mode 100644
index d3c5a94..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.row;
-
-/**
- * Load status type
- */
-public enum LoadStatusType {
-
- INSERT_OVERWRITE("Overwrite In Progress"), // if insert overwrite operation is in progress
- IN_PROGRESS("In Progress"); // if load, insert into operation is in progress
-
- private String message;
-
- LoadStatusType(String message) {
- this.message = message;
- }
-
- public String getMessage() {
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 57e1544..6589ee5 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
@@ -145,7 +146,7 @@ public class CarbonUpdateUtil {
.setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
}
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
- blockDetail.setStatus(newBlockEntry.getStatus());
+ blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
} else {
// add the new details to the list.
@@ -224,7 +225,7 @@ public class CarbonUpdateUtil {
// if the segments is in the list of marked for delete then update the status.
if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
- loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+ loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
}
}
@@ -508,9 +509,8 @@ public class CarbonUpdateUtil {
// if this segment is valid then only we will go for delta file deletion.
// if the segment is mark for delete or compacted then any way it will get deleted.
- if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- || segment.getLoadStatus()
- .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+ if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
+ || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
// take the list of files from this segment.
String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
@@ -565,7 +565,7 @@ public class CarbonUpdateUtil {
}
// case 1
- if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+ if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
completeListOfDeleteDeltaFiles = updateStatusManager
.getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
allSegmentFiles);
@@ -687,17 +687,8 @@ public class CarbonUpdateUtil {
}
}
- /**
- *
- * @param blockStatus
- * @return
- */
- public static boolean isBlockInvalid(String blockStatus) {
- if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || blockStatus
- .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
- return true;
- }
- return false;
+ public static boolean isBlockInvalid(SegmentStatus blockStatus) {
+ return blockStatus == SegmentStatus.COMPACTED || blockStatus == SegmentStatus.MARKED_FOR_DELETE;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
index 6f99b3a..583e2ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
/**
* This class stores the segment details of table update status file
@@ -30,7 +31,7 @@ public class SegmentUpdateDetails implements Serializable {
private static final long serialVersionUID = 1206104914918491724L;
private String segmentName;
private String blockName;
- private String status = "";
+ private SegmentStatus segmentStatus;
private String deleteDeltaEndTimestamp = "";
private String deleteDeltaStartTimestamp = "";
private String actualBlockName;
@@ -74,12 +75,12 @@ public class SegmentUpdateDetails implements Serializable {
this.deleteDeltaStartTimestamp = deleteDeltaStartTimestamp;
}
- public void setStatus(String status) {
- this.status = status;
+ public void setSegmentStatus(SegmentStatus segmentStatus) {
+ this.segmentStatus = segmentStatus;
}
- public String getStatus() {
- return this.status;
+ public SegmentStatus getSegmentStatus() {
+ return this.segmentStatus;
}
@Override public int hashCode() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
index b75d796..1c0b586 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
@@ -80,7 +80,7 @@ public class CarbonDeleteDeltaFileReaderImpl implements CarbonDeleteDeltaFileRea
StringWriter sw = new StringWriter();
dataInputStream = FileFactory.getDataInputStream(filePath, fileType);
inputStream = new InputStreamReader(dataInputStream,
- CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+ CarbonCommonConstants.DEFAULT_CHARSET);
int n = 0;
while (-1 != (n = inputStream.read(buffer))) {
sw.write(buffer, 0, n);
@@ -108,7 +108,7 @@ public class CarbonDeleteDeltaFileReaderImpl implements CarbonDeleteDeltaFileRea
}
dataInputStream = fileOperation.openForRead();
inStream = new InputStreamReader(dataInputStream,
- CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+ CarbonCommonConstants.DEFAULT_CHARSET);
buffReader = new BufferedReader(inStream);
deleteDeltaBlockDetails =
gsonObjectToRead.fromJson(buffReader, DeleteDeltaBlockDetails.class);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 51a04e4..f42ca23 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -30,9 +30,16 @@ public class LoadMetadataDetails implements Serializable {
private static final long serialVersionUID = 1106104914918491724L;
private String timestamp;
- private String loadStatus;
+
+ // For backward compatibility, this member is required to read from JSON in the table_status file
+ private SegmentStatus loadStatus;
+
+ // name of the segment
private String loadName;
+
+ // partition count of this segment
private String partitionCount;
+
private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE;
// update delta end timestamp
@@ -94,12 +101,12 @@ public class LoadMetadataDetails implements Serializable {
this.timestamp = getTimeStampConvertion(timestamp);;
}
- public String getLoadStatus() {
+ public SegmentStatus getSegmentStatus() {
return loadStatus;
}
- public void setLoadStatus(String loadStatus) {
- this.loadStatus = loadStatus;
+ public void setSegmentStatus(SegmentStatus segmentStatus) {
+ this.loadStatus = segmentStatus;
}
public String getLoadName() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
new file mode 100644
index 0000000..3b62ec9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * Status of one segment. This enum is serialized into table_status file, so
+ * please ensure the SerializedName is backward compatible when modifying this enum
+ */
+public enum SegmentStatus {
+
+ /**
+ * Data load success, it is visible for read
+ */
+ @SerializedName("Success")
+ SUCCESS("Success"),
+
+ /**
+ * Data load failed
+ */
+ @SerializedName("Failure")
+ LOAD_FAILURE("Failure"),
+
+ /**
+ * Data load partial success
+ */
+ @SerializedName("Partial Success")
+ LOAD_PARTIAL_SUCCESS("Partial Success"),
+
+ /**
+ * Segment has been deleted by user or compactor
+ */
+ @SerializedName("Marked for Delete")
+ MARKED_FOR_DELETE("Marked for Delete"),
+
+ /**
+ * Segment has been updated by user
+ */
+ @SerializedName("Marked for Update")
+ MARKED_FOR_UPDATE("Marked for Update"),
+
+ /**
+ * Segment is compacted
+ */
+ @SerializedName("Compacted")
+ COMPACTED("Compacted"),
+
+ /**
+ * Insert overwrite operation is in progress
+ */
+ @SerializedName("Overwrite In Progress") // This string can't be modified due to compatibility
+ INSERT_OVERWRITE_IN_PROGRESS("Insert Overwrite In Progress"),
+
+ /**
+ * insert into operation is in progress
+ */
+ @SerializedName("In Progress") // This string can't be modified due to compatibility
+ INSERT_IN_PROGRESS("Insert In Progress"),
+
+ /**
+ * Streaming ingest in progress, for streaming segment
+ */
+ @SerializedName("Streaming")
+ STREAMING("Streaming"),
+
+ /**
+ * Streaming ingest finish, for streaming segment
+ */
+ @SerializedName("Streaming Finish")
+ STREAMING_FINISH("Streaming Finish"),
+
+ /**
+ * This status is not used, keep it here just for backward compatibility
+ */
+ @SerializedName("Update")
+ UPDATE("Update");
+
+ private String message;
+
+ SegmentStatus(String message) {
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 30304d9..a42239f 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.row.LoadStatusType;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
@@ -109,7 +108,10 @@ public class SegmentStatusManager {
absoluteTableIdentifier.getCarbonTableIdentifier());
String dataPath = carbonTablePath.getTableStatusFilePath();
DataInputStream dataInputStream = null;
- Gson gsonObjectToRead = new Gson();
+
+ // Use GSON to deserialize the load information
+ Gson gson = new Gson();
+
AtomicFileOperations fileOperation =
new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
LoadMetadataDetails[] loadFolderDetailsArray;
@@ -117,53 +119,41 @@ public class SegmentStatusManager {
if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
dataInputStream = fileOperation.openForRead();
BufferedReader buffReader =
- new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
- loadFolderDetailsArray = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
+ new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
+ loadFolderDetailsArray = gson.fromJson(buffReader, LoadMetadataDetails[].class);
//just directly iterate Array
- for (LoadMetadataDetails loadMetadataDetails : loadFolderDetailsArray) {
- if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.MARKED_FOR_UPDATE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+ for (LoadMetadataDetails segment : loadFolderDetailsArray) {
+ if (SegmentStatus.SUCCESS == segment.getSegmentStatus() ||
+ SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus() ||
+ SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus() ||
+ SegmentStatus.STREAMING == segment.getSegmentStatus() ||
+ SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
// check for merged loads.
- if (null != loadMetadataDetails.getMergedLoadName()) {
- if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
- listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
+ if (null != segment.getMergedLoadName()) {
+ if (!listOfValidSegments.contains(segment.getMergedLoadName())) {
+ listOfValidSegments.add(segment.getMergedLoadName());
}
// if merged load is updated then put it in updated list
- if (CarbonCommonConstants.MARKED_FOR_UPDATE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
- listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
+ if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
+ listOfValidUpdatedSegments.add(segment.getMergedLoadName());
}
continue;
}
- if (CarbonCommonConstants.MARKED_FOR_UPDATE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+ if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
- listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
+ listOfValidUpdatedSegments.add(segment.getLoadName());
}
- if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
- listOfStreamSegments.add(loadMetadataDetails.getLoadName());
+ if (SegmentStatus.STREAMING == segment.getSegmentStatus() ||
+ SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
+ listOfStreamSegments.add(segment.getLoadName());
continue;
}
- listOfValidSegments.add(loadMetadataDetails.getLoadName());
- } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.COMPACTED
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.MARKED_FOR_DELETE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
- listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
+ listOfValidSegments.add(segment.getLoadName());
+ } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() ||
+ SegmentStatus.COMPACTED == segment.getSegmentStatus() ||
+ SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
+ listOfInvalidSegments.add(segment.getLoadName());
}
}
}
@@ -489,15 +479,14 @@ public class SegmentStatusManager {
if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
// if the segment is compacted then no need to delete that.
- if (CarbonCommonConstants.COMPACTED
- .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+ if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId);
invalidLoadIds.add(loadId);
return invalidLoadIds;
}
- if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
+ if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
loadFound = true;
- loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+ loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
LOG.info("Segment ID " + loadId + " Marked for Delete");
}
@@ -533,15 +522,14 @@ public class SegmentStatusManager {
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
if (result < 0) {
- if (CarbonCommonConstants.COMPACTED
- .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+ if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
+ "as the segment has been compacted.");
continue;
}
- if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())
- && !LoadStatusType.IN_PROGRESS.getMessage().equals(loadMetadata.getLoadStatus())
- && !LoadStatusType.INSERT_OVERWRITE.getMessage().equals(loadMetadata.getLoadStatus())) {
+ if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus() &&
+ SegmentStatus.INSERT_IN_PROGRESS != loadMetadata.getSegmentStatus() &&
+ SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != loadMetadata.getSegmentStatus()) {
loadFound = true;
updateSegmentMetadataDetails(loadMetadata);
LOG.info("Info: " +
@@ -594,7 +582,7 @@ public class SegmentStatusManager {
List<LoadMetadataDetails> newListMetadata =
new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata));
for (LoadMetadataDetails oldSegment : oldMetadata) {
- if (CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oldSegment.getLoadStatus())) {
+ if (SegmentStatus.MARKED_FOR_DELETE == oldSegment.getSegmentStatus()) {
updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment)));
}
}
@@ -608,8 +596,8 @@ public class SegmentStatusManager {
*/
public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
// update status only if the segment is not marked for delete
- if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) {
- loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+ if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
+ loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 1218231..982c598 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -325,7 +325,7 @@ public class SegmentUpdateStatusManager {
List<String> blockNames = new ArrayList<String>();
for (SegmentUpdateDetails block : updateDetails) {
if (block.getSegmentName().equalsIgnoreCase(segmentName) && !CarbonUpdateUtil
- .isBlockInvalid(block.getStatus())) {
+ .isBlockInvalid(block.getSegmentStatus())) {
blockNames.add(block.getBlockName());
}
}
@@ -343,11 +343,8 @@ public class SegmentUpdateStatusManager {
SegmentUpdateDetails details = getDetailsForABlock(segName, blockName);
- if (details == null || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
- return true;
- }
+ return details == null || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus());
- return false;
}
/**
* Returns all delta file paths of specified block
@@ -363,8 +360,9 @@ public class SegmentUpdateStatusManager {
String segment) {
List<String> deleteFileList = new ArrayList<>();
for (SegmentUpdateDetails block : updateDetails) {
- if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName()
- .equalsIgnoreCase(segment) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+ if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) &&
+ block.getSegmentName().equalsIgnoreCase(segment) &&
+ !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block);
// If there is no delete delete file , then return null
if (deltaStartTimestamp == 0) {
@@ -434,7 +432,7 @@ public class SegmentUpdateStatusManager {
for (SegmentUpdateDetails block : updateDetails) {
if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
(block.getSegmentName().equalsIgnoreCase(segmentId))
- && !CarbonUpdateUtil.isBlockInvalid((block.getStatus()))) {
+ && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
final long deltaStartTimestamp =
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
final long deltaEndTimeStamp =
@@ -605,7 +603,7 @@ public class SegmentUpdateStatusManager {
for (SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) {
if (blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName())
- && CarbonUpdateUtil.isBlockInvalid(blockDetails.getStatus())) {
+ && CarbonUpdateUtil.isBlockInvalid(blockDetails.getSegmentStatus())) {
validBlock = false;
}
}
@@ -699,7 +697,7 @@ public class SegmentUpdateStatusManager {
}
dataInputStream = fileOperation.openForRead();
inStream = new InputStreamReader(dataInputStream,
- CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+ CarbonCommonConstants.DEFAULT_CHARSET);
buffReader = new BufferedReader(inStream);
listOfSegmentUpdateDetailsArray =
gsonObjectToRead.fromJson(buffReader, SegmentUpdateDetails[].class);
@@ -755,7 +753,7 @@ public class SegmentUpdateStatusManager {
try {
dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+ CarbonCommonConstants.DEFAULT_CHARSET));
String metadataInstance = gsonObjectToWrite.toJson(listOfSegmentUpdateDetailsArray);
brWriter.write(metadataInstance);
@@ -792,8 +790,9 @@ public class SegmentUpdateStatusManager {
SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
readLoadMetadata();
for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
- if (segmentId.equalsIgnoreCase(block.getSegmentName()) && block.getBlockName()
- .equalsIgnoreCase(blockName) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+ if (segmentId.equalsIgnoreCase(block.getSegmentName()) &&
+ block.getBlockName().equalsIgnoreCase(blockName) &&
+ !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
long deleteTimestampFromStatusFile = block.getDeleteDeltaEndTimeAsLong();
if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) {
return null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
index 3bf61f1..1802515 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
@@ -70,7 +70,7 @@ public class CarbonDeleteDeltaWriterImpl implements CarbonDeleteDeltaWriter {
FileFactory.createNewFile(filePath, fileType);
dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
- CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+ CarbonCommonConstants.DEFAULT_CHARSET));
brWriter.write(value);
} catch (IOException ioe) {
LOGGER.error("Error message: " + ioe.getLocalizedMessage());
@@ -98,7 +98,7 @@ public class CarbonDeleteDeltaWriterImpl implements CarbonDeleteDeltaWriter {
dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
Gson gsonObjectToWrite = new Gson();
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
- CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+ CarbonCommonConstants.DEFAULT_CHARSET));
String deletedData = gsonObjectToWrite.toJson(deleteDeltaBlockDetails);
brWriter.write(deletedData);
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
index b823be5..df1e68f 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark.sql.examples
+package org.apache.carbondata.examples
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.{CleanFiles, ShowSegments}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 92ef6da..f3963ad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -321,12 +321,10 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
List<String> streamSegments = null;
List<String> filteredSegmentToAccess = new ArrayList<>();
if (getValidateSegmentsToAccess(job.getConfiguration())) {
-
String[] segmentsToAccess = getSegmentsToAccess(job);
Set<String> segmentToAccessSet = new HashSet<>();
- for (String segmentToAccess : segmentsToAccess) {
- segmentToAccessSet.add(segmentToAccess);
- }
+ segmentToAccessSet.addAll(Arrays.asList(segmentsToAccess));
+
// get all valid segments and set them into the configuration
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
@@ -903,7 +901,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
// if block is invalid then dont add the count
SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
- if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+ if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
Long blockCount = blockRowCountMapping.get(key);
if (blockCount == null) {
blockCount = 0L;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
index b814468..89a4a9a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
@@ -74,7 +74,7 @@ public class BlockLevelTraverser {
// if block is invalid then dont add the count
SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
- if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+ if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
blockRowMap.put(key, rowCount);
count++;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index dd48cd9..3b5d736 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -455,7 +456,7 @@ public class StoreCreator {
String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
- loadMetadataDetails.setLoadStatus("SUCCESS");
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
loadMetadataDetails.setLoadName(String.valueOf(0));
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
listOfLoadFolderDetails.add(loadMetadataDetails);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index e932213..e8f5d6d 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -25,7 +25,7 @@ import java.util.{ArrayList, Date, List, UUID}
import scala.collection.JavaConversions._
-import com.google.gson.Gson
+import com.google.gson.{Gson, GsonBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
@@ -46,7 +46,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
@@ -523,7 +523,7 @@ object CarbonDataStoreCreator {
try {
val loadMetadataDetails: LoadMetadataDetails = new LoadMetadataDetails()
loadMetadataDetails.setLoadEndTime(System.currentTimeMillis())
- loadMetadataDetails.setLoadStatus("SUCCESS")
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
loadMetadataDetails.setLoadName(String.valueOf(0))
loadMetadataDetails.setLoadStartTime(
loadMetadataDetails.getTimeStamp(readCurrentTime()))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index b07c82a..111ede7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -19,12 +19,13 @@ package org.apache.carbondata.spark.testsuite.datacompaction
import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier
import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.hadoop.CacheClient
import org.apache.spark.sql.test.util.QueryTest
@@ -118,8 +119,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
// status should remain as compacted.
- assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
-
+ assertResult(SegmentStatus.COMPACTED)(segs(3).getSegmentStatus)
}
/**
@@ -140,10 +140,9 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
// status should remain as compacted for segment 2.
- assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
+ assertResult(SegmentStatus.COMPACTED)(segs(3).getSegmentStatus)
// for segment 0.1 . should get deleted
- assert(segs(2).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE))
-
+ assertResult(SegmentStatus.MARKED_FOR_DELETE)(segs(2).getSegmentStatus)
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 054f778..cf6a032 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -18,7 +18,6 @@
package org.apache.carbondata.api
import java.lang.Long
-import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
@@ -29,7 +28,7 @@ import org.apache.spark.sql.types.TimestampType
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -47,7 +46,6 @@ object CarbonStore {
tableFolderPath: String): Seq[Row] = {
val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(tableFolderPath)
if (loadMetadataDetailsArray.nonEmpty) {
- val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) =>
java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double.parseDouble(l2.getLoadName)
}
@@ -66,11 +64,12 @@ object CarbonStore {
loadMetadataDetailsSortedArray
.filter(_.getVisibility.equalsIgnoreCase("true"))
.map { load =>
- val mergedTo = if (load.getMergedLoadName != null) {
- load.getMergedLoadName
- } else {
- ""
- }
+ val mergedTo =
+ if (load.getMergedLoadName != null) {
+ load.getMergedLoadName
+ } else {
+ "NA"
+ }
val startTime =
if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
@@ -88,11 +87,10 @@ object CarbonStore {
Row(
load.getLoadName,
- load.getLoadStatus,
+ load.getSegmentStatus.getMessage,
startTime,
endTime,
- mergedTo
- )
+ mergedTo)
}.toSeq
} else {
Seq.empty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 7cf8c88..bbf242e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -28,7 +28,7 @@ package org.apache.carbondata.spark
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.carbondata.core.mutate.SegmentUpdateDetails
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
trait Value[V] extends Serializable {
@@ -76,13 +76,13 @@ class updateResultImpl
}
trait DeleteDelataResult[K, V] extends Serializable {
- def getKey(key: String, value: (SegmentUpdateDetails, ExecutionErrors)): (K, V)
+ def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, ExecutionErrors)): (K, V)
}
class DeleteDelataResultImpl
- extends DeleteDelataResult[String, (SegmentUpdateDetails, ExecutionErrors)] {
- override def getKey(key: String,
- value: (SegmentUpdateDetails, ExecutionErrors)): (String, (SegmentUpdateDetails,
+ extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors)] {
+ override def getKey(key: SegmentStatus,
+ value: (SegmentUpdateDetails, ExecutionErrors)): (SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors)) = {
(key, value)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index bdf9a71..ca2ca2e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
@@ -144,14 +144,14 @@ object DataLoadProcessBuilderOnSpark {
val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE +
"Partial_Success"
val loadMetadataDetails = new LoadMetadataDetails()
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
executionErrors.failureCauses = FailureCauses.BAD_RECORDS
Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
} else {
val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success"
val loadMetadataDetails = new LoadMetadataDetails()
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 2fb72f7..2fbc7a3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.spark.util.GlobalDictionaryUtil
@@ -52,7 +53,7 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: CarbonTableIdentifier,
carbonStorePath: String)
- extends CarbonRDD[(Int, String)](sc, Nil) {
+ extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
@@ -64,10 +65,10 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
}
override def internalCompute(split: Partition,
- context: TaskContext): Iterator[(Int, String)] = {
+ context: TaskContext): Iterator[(Int, SegmentStatus)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- val iter = new Iterator[(Int, String)] {
+ val status = SegmentStatus.SUCCESS
+ val iter = new Iterator[(Int, SegmentStatus)] {
try {
val columnSchema = split.asInstanceOf[AddColumnPartition].columnSchema
// create dictionary file if it is a dictionary column
@@ -111,7 +112,7 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
}
}
- override def next(): (Int, String) = {
+ override def next(): (Int, SegmentStatus) = {
(split.index, status)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index fde5cd6..5b0889f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.SegmentStatus
/**
* This is a partitioner class for dividing the newly added columns into partitions
@@ -49,7 +50,7 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: CarbonTableIdentifier,
carbonStorePath: String)
- extends CarbonRDD[(Int, String)](sc, Nil) {
+ extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
override def getPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
@@ -58,10 +59,10 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
}
override def internalCompute(split: Partition,
- context: TaskContext): Iterator[(Int, String)] = {
+ context: TaskContext): Iterator[(Int, SegmentStatus)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- val iter = new Iterator[(Int, String)] {
+ val status = SegmentStatus.SUCCESS
+ val iter = new Iterator[(Int, SegmentStatus)] {
try {
val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
@@ -87,7 +88,7 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
}
}
- override def next(): (Int, String) = {
+ override def next(): (Int, SegmentStatus) = {
(split.index, status)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index b2e0c47..c1c7bf1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.service.{CarbonCommonFactory, PathService}
+import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -333,17 +334,17 @@ class CarbonBlockDistinctValuesCombineRDD(
class CarbonGlobalDictionaryGenerateRDD(
prev: RDD[(Int, ColumnDistinctValues)],
model: DictionaryLoadModel)
- extends CarbonRDD[(Int, String)](prev) {
+ extends CarbonRDD[(Int, SegmentStatus)](prev) {
override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
override def internalCompute(split: Partition,
- context: TaskContext): Iterator[(Int, String)] = {
+ context: TaskContext): Iterator[(Int, SegmentStatus)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
model.hdfsLocation)
- var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- val iter = new Iterator[(Int, String)] {
+ var status = SegmentStatus.SUCCESS
+ val iter = new Iterator[(Int, SegmentStatus)] {
var dictionaryForDistinctValueLookUp: Dictionary = _
var dictionaryForSortIndexWriting: Dictionary = _
var dictionaryForDistinctValueLookUpCleared: Boolean = false
@@ -452,7 +453,7 @@ class CarbonGlobalDictionaryGenerateRDD(
} catch {
case dictionaryException: NoRetryException =>
LOGGER.error(dictionaryException)
- status = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ status = SegmentStatus.LOAD_FAILURE
case ex: Exception =>
LOGGER.error(ex)
throw ex
@@ -485,7 +486,7 @@ class CarbonGlobalDictionaryGenerateRDD(
}
}
- override def next(): (Int, String) = {
+ override def next(): (Int, SegmentStatus) = {
(split.index, status)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index cbdb336..3a74dfa 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -30,10 +30,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, LoadMetadataUtil}
+import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders}
import org.apache.carbondata.spark.compaction.CompactionCallable
import org.apache.carbondata.spark.util.CommonUtil
@@ -162,13 +162,25 @@ object DataManagementFunc {
}
}
+ private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
+ val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+ if (details != null && details.nonEmpty) for (oneRow <- details) {
+ if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
+ SegmentStatus.COMPACTED == oneRow.getSegmentStatus) &&
+ oneRow.getVisibility.equalsIgnoreCase("true")) {
+ return true
+ }
+ }
+ false
+ }
+
def deleteLoadsAndUpdateMetadata(
dbName: String,
tableName: String,
storePath: String,
isForceDeletion: Boolean,
carbonTable: CarbonTable): Unit = {
- if (LoadMetadataUtil.isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
+ if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
val carbonTableStatusLock =
CarbonLockFactory.getCarbonLockObj(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 74f7528..6f44a0d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
@@ -212,7 +212,7 @@ class NewCarbonDataLoadRDD[K, V](
carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
try {
loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.USE_PREFETCH_WHILE_LOADING, CarbonCommonConstants.USE_PREFETCH_WHILE_LOADING_DEFAULT)
@@ -233,12 +233,12 @@ class NewCarbonDataLoadRDD[K, V](
recordReaders)
} catch {
case e: NoRetryException =>
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
executionErrors.failureCauses = FailureCauses.BAD_RECORDS
executionErrors.errorMsg = e.getMessage
logInfo("Bad Record Found")
case e: Exception =>
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
logInfo("DataLoad failure", e)
LOGGER.error(e)
throw e
@@ -247,8 +247,7 @@ class NewCarbonDataLoadRDD[K, V](
CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
// So print the data load statistics only in case of non failure case
- if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- .equals(loadMetadataDetails.getLoadStatus)) {
+ if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance
.printStatisticsInfo(model.getPartitionId)
}
@@ -346,7 +345,7 @@ class NewDataFrameLoaderRDD[K, V](
try {
loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
carbonLoadModel.setPreFetch(false)
@@ -376,12 +375,12 @@ class NewDataFrameLoaderRDD[K, V](
executor.execute(model, loader.storeLocation, recordReaders.toArray)
} catch {
case e: NoRetryException =>
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
executionErrors.failureCauses = FailureCauses.BAD_RECORDS
executionErrors.errorMsg = e.getMessage
logInfo("Bad Record Found")
case e: Exception =>
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
logInfo("DataLoad failure", e)
LOGGER.error(e)
throw e
@@ -390,8 +389,7 @@ class NewDataFrameLoaderRDD[K, V](
CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
// So print the data load statistics only in case of non failure case
- if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- .equals(loadMetadataDetails.getLoadStatus)) {
+ if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance
.printStatisticsInfo(model.getPartitionId)
}
@@ -540,7 +538,7 @@ class PartitionTableDataLoaderRDD[K, V](
try {
loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
carbonLoadModel.setPreFetch(false)
@@ -561,12 +559,12 @@ class PartitionTableDataLoaderRDD[K, V](
executor.execute(model, loader.storeLocation, recordReaders)
} catch {
case e: NoRetryException =>
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
executionErrors.failureCauses = FailureCauses.BAD_RECORDS
executionErrors.errorMsg = e.getMessage
logInfo("Bad Record Found")
case e: Exception =>
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
logInfo("DataLoad For Partition Table failure", e)
LOGGER.error(e)
throw e
@@ -575,8 +573,7 @@ class PartitionTableDataLoaderRDD[K, V](
CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
// So print the data load statistics only in case of non failure case
- if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- .equals(loadMetadataDetails.getLoadStatus)) {
+ if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance
.printStatisticsInfo(model.getPartitionId)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index fbe9377..600cd80 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -27,8 +27,6 @@ import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.indexstore.ExtendedBlocklet
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
@@ -69,8 +67,6 @@ class DataMapPruneRDD(sc: SparkContext,
override def internalCompute(split: Partition,
context: TaskContext): Iterator[ExtendedBlocklet] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index eb07240..b1dfc01 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.Row
import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.DataLoadExecutor
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -55,7 +55,7 @@ object UpdateDataLoad {
// Intialize to set carbon properties
loader.initialize()
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
new DataLoadExecutor().execute(carbonLoadModel,
loader.storeLocation,
recordReaders.toArray)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 84294ff..b51d0f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -39,7 +39,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.LoadStatusType
import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
@@ -48,7 +47,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.comparator.Comparator
import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -518,11 +517,12 @@ object CommonUtil {
storePath: String,
insertOverwrite: Boolean): Unit = {
val newLoadMetaEntry = new LoadMetadataDetails
- val status: String = if (insertOverwrite) {
- LoadStatusType.INSERT_OVERWRITE.getMessage
+ val status = if (insertOverwrite) {
+ SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
} else {
- LoadStatusType.IN_PROGRESS.getMessage
+ SegmentStatus.INSERT_IN_PROGRESS
}
+
// reading the start time of data load.
val loadStartTime = CarbonUpdateUtil.readCurrentTime
model.setFactTimeStamp(loadStartTime)
@@ -548,7 +548,7 @@ object CommonUtil {
model: CarbonLoadModel): Unit = {
// in case if failure the load status should be "Marked for delete" so that it will be taken
// care during clean up
- val loadStatus = CarbonCommonConstants.MARKED_FOR_DELETE
+ val loadStatus = SegmentStatus.MARKED_FOR_DELETE
// always the last entry in the load metadata details will be the current load entry
val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1)
CarbonLoaderUtil
@@ -799,9 +799,9 @@ object CommonUtil {
var loadInprogressExist = false
val staleFolders: Seq[CarbonFile] = Seq()
listOfLoadFolderDetailsArray.foreach { load =>
- if (load.getLoadStatus.equals(LoadStatusType.IN_PROGRESS.getMessage) ||
- load.getLoadStatus.equals(LoadStatusType.INSERT_OVERWRITE.getMessage)) {
- load.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+ if (load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
+ load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+ load.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
staleFolders :+ FileFactory.getCarbonFile(
carbonTablePath.getCarbonDataDirectoryPath("0", load.getLoadName))
loadInprogressExist = true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 840e8ae..3bb8d94 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
import org.apache.carbondata.core.reader.CarbonDictionaryReader
import org.apache.carbondata.core.service.CarbonCommonFactory
+import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.CarbonDictionaryWriter
@@ -388,12 +389,12 @@ object GlobalDictionaryUtil {
private def checkStatus(carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
model: DictionaryLoadModel,
- status: Array[(Int, String)]) = {
+ status: Array[(Int, SegmentStatus)]) = {
var result = false
val tableName = model.table.getTableName
status.foreach { x =>
val columnName = model.primDimensions(x._1).getColName
- if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(x._2)) {
+ if (SegmentStatus.LOAD_FAILURE == x._2) {
result = true
LOGGER.error(s"table:$tableName column:$columnName generate global dictionary file failed")
}