You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/03/28 14:21:38 UTC
carbondata git commit: [CARBONDATA-2287] Events added for alter hive
partition table
Repository: carbondata
Updated Branches:
refs/heads/master 05086e536 -> 877eabdd6
[CARBONDATA-2287] Events added for alter hive partition table
Events added for alter hive partition table
This closes #2107
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/877eabdd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/877eabdd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/877eabdd
Branch: refs/heads/master
Commit: 877eabdd6080c514e23a9cbfbaef9f78acc0d39f
Parents: 05086e5
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue Mar 27 12:20:04 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Mar 28 19:54:39 2018 +0530
----------------------------------------------------------------------
.../filesystem/AbstractDFSCarbonFile.java | 3 ++
.../carbondata/core/util/CarbonProperties.java | 50 ++++++++++----------
.../apache/carbondata/core/util/CarbonUtil.java | 24 ++++++++++
.../carbondata/events/AlterTableEvents.scala | 16 +++++++
.../org/apache/carbondata/events/Events.scala | 10 +++-
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 23 ++++-----
...arbonAlterTableAddHivePartitionCommand.scala | 14 +++++-
...rbonAlterTableDropHivePartitionCommand.scala | 12 +++++
.../CarbonAlterTableSplitPartitionCommand.scala | 12 +++++
.../impl/MeasureFieldConverterImpl.java | 5 +-
.../store/CarbonFactDataHandlerModel.java | 30 ++++++++++--
11 files changed, 156 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 8cf3efe..bf3292b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -489,6 +489,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
fs.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false,
fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(path),
fs.getDefaultBlockSize(path), null).close();
+ // haddop masks the permission accoding to configured permission, so need to set permission
+ // forcefully
+ fs.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
return true;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 6fa21bc..38f7513 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -874,7 +874,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER));
// checking min and max . 0 , 100 is min & max.
if (numberOfSegmentsToBePreserved < 0 || numberOfSegmentsToBePreserved > 100) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER + " is incorrect."
+ " Correct value should be in range of 0 -100. Taking the default value.");
numberOfSegmentsToBePreserved =
@@ -930,7 +930,7 @@ public final class CarbonProperties {
}
compactionSize[i++] = size;
} catch (NumberFormatException e) {
- LOGGER.error(
+ LOGGER.warn(
"Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD
+ " is not proper. Taking the default value "
+ CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
@@ -953,7 +953,7 @@ public final class CarbonProperties {
CarbonCommonConstants.NUM_CORES_LOADING,
CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
} catch (NumberFormatException exc) {
- LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+ LOGGER.warn("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+ " is wrong. Falling back to the default value "
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
@@ -974,18 +974,18 @@ public final class CarbonProperties {
} catch (Exception e) {
inMemoryChunkSizeInMB =
Integer.parseInt(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT);
- LOGGER.error("Problem in parsing the sort memory chunk size, setting with default value"
+ LOGGER.warn("Problem in parsing the sort memory chunk size, setting with default value"
+ inMemoryChunkSizeInMB);
}
if (inMemoryChunkSizeInMB > 1024) {
inMemoryChunkSizeInMB = 1024;
- LOGGER.error(
+ LOGGER.warn(
"It is not recommended to increase the sort memory chunk size more than 1024MB, "
+ "so setting the value to "
+ inMemoryChunkSizeInMB);
} else if (inMemoryChunkSizeInMB < 1) {
inMemoryChunkSizeInMB = 1;
- LOGGER.error(
+ LOGGER.warn(
"It is not recommended to decrease the sort memory chunk size less than 1MB, "
+ "so setting the value to "
+ inMemoryChunkSizeInMB);
@@ -1071,7 +1071,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+ "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
@@ -1079,7 +1079,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
}
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
@@ -1101,7 +1101,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+ "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
@@ -1109,7 +1109,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
}
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
@@ -1127,7 +1127,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
if (!validateBoolean) {
- LOGGER.error("The carbon.use.multiple.temp.dir configuration value is invalid."
+ LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is invalid."
+ "Configured value: \"" + usingMultiDirStr + "\"."
+ "Data Load will not use multiple temp directories.");
usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
@@ -1144,7 +1144,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT);
boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
if (!validateStorageLevel) {
- LOGGER.error("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL
+ LOGGER.warn("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL
+ " configuration value is invalid. It will use default storage level("
+ CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT
+ ") to persist rdd.");
@@ -1173,7 +1173,7 @@ public final class CarbonProperties {
}
if (isInvalidValue) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM
+ " is incorrect. Correct value should be in range of 0 - 1000."
+ " Taking the default value: "
@@ -1194,7 +1194,7 @@ public final class CarbonProperties {
CarbonCommonConstants.defaultValueIsPersistEnabled);
boolean validatePersistEnabled = CarbonUtil.validateBoolean(isPersistEnabled);
if (!validatePersistEnabled) {
- LOGGER.error("The " + CarbonCommonConstants.isPersistEnabled
+ LOGGER.warn("The " + CarbonCommonConstants.isPersistEnabled
+ " configuration value is invalid. It will use default value("
+ CarbonCommonConstants.defaultValueIsPersistEnabled
+ ").");
@@ -1212,7 +1212,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT);
boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
if (!validateStorageLevel) {
- LOGGER.error("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
+ LOGGER.warn("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
+ " configuration value is invalid. It will use default storage level("
+ CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT
+ ") to persist dataset.");
@@ -1232,7 +1232,7 @@ public final class CarbonProperties {
|| "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
return compressor;
} else {
- LOGGER.error("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
+ LOGGER.warn("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
.concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 and")
.concat(" empty are allowed. It will not compress the sort temp files by default"));
return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
@@ -1279,14 +1279,14 @@ public final class CarbonProperties {
sortMemorySizeInMB = Integer.parseInt(
carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB));
} catch (NumberFormatException e) {
- LOGGER.error(
+ LOGGER.warn(
"The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
+ "is Invalid." + " Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
sortMemorySizeInMB = sortMemorySizeInMBDefault;
}
if (sortMemorySizeInMB < sortMemorySizeInMBDefault) {
- LOGGER.error(
+ LOGGER.warn(
"The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
+ "is less than default value." + ". Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
@@ -1324,14 +1324,14 @@ public final class CarbonProperties {
unsafeWorkingMemory = Integer.parseInt(
carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB));
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid."
+ " Taking the default value."
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
unsafeWorkingMemory = unsafeWorkingMemoryDefault;
}
if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT
+ "is less than the default value." + ". Taking the default value."
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
@@ -1349,14 +1349,14 @@ public final class CarbonProperties {
unsafeSortStorageMemory = Integer.parseInt(carbonProperties
.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid."
+ " Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
unsafeSortStorageMemory = unsafeSortStorageMemoryDefault;
}
if (unsafeSortStorageMemory < unsafeSortStorageMemoryDefault) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB
+ "is less than the default value." + " Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
@@ -1397,7 +1397,7 @@ public final class CarbonProperties {
.getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES,
CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT));
} catch (NumberFormatException exc) {
- LOGGER.error(
+ LOGGER.warn(
"The heap memory pooling threshold bytes is invalid. Using the default value "
+ CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
thresholdSize = Integer.parseInt(
@@ -1418,7 +1418,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT));
preserveSeconds = preserveHours * 3600 * 1000L;
} catch (NumberFormatException exc) {
- LOGGER.error(
+ LOGGER.warn(
"The value of '" + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS
+ "' is invalid. Using the default value "
+ CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT);
@@ -1438,7 +1438,7 @@ public final class CarbonProperties {
.getProperty(CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT,
CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT));
} catch (NumberFormatException exc) {
- LOGGER.error(
+ LOGGER.warn(
"The value of '" + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT
+ "' is invalid. Using the default value "
+ CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 1082d78..3c347db 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -99,6 +99,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TBase;
@@ -846,6 +848,28 @@ public final class CarbonUtil {
}
/**
+ *
+ * This method will check and create the given path with 777 permission
+ */
+ public static boolean checkAndCreateFolderWithPermission(String path) {
+ boolean created = false;
+ try {
+ FileFactory.FileType fileType = FileFactory.getFileType(path);
+ if (FileFactory.isFileExist(path, fileType)) {
+ created = true;
+ } else {
+ FileFactory.createDirectoryAndSetPermission(path,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ created = true;
+ }
+ } catch (IOException e) {
+ LOGGER.error(e);
+ }
+ return created;
+ }
+
+
+ /**
* This method will return the size of a given file
*/
public static long getFileSize(String filePath) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 538df4a..7c4339f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -215,3 +215,19 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo
+
+/**
+ * pre event for standard hive partition
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class PreAlterTableHivePartitionCommandEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo
+
+/**
+ * post event for standard hive partition
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class PostAlterTableHivePartitionCommandEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 799d8c4..d85e8ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -100,7 +100,7 @@ trait AlterTableCompactionStatusUpdateEventInfo {
}
/**
- * event for alter_table_compaction
+ * event info for alter_table_compaction
*/
trait AlterTableCompactionEventInfo {
val sparkSession: SparkSession
@@ -108,6 +108,14 @@ trait AlterTableCompactionEventInfo {
}
/**
+ * event for alter table standard hive partition
+ */
+trait AlterTableHivePartitionInfo {
+ val sparkSession: SparkSession
+ val carbonTable: CarbonTable
+}
+
+/**
* event for DeleteSegmentById
*/
trait DeleteSegmentbyIdEventInfo {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index bf5f660..50102f1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -311,18 +311,7 @@ object StreamHandoffRDD {
SegmentStatus.INSERT_IN_PROGRESS,
carbonLoadModel.getFactTimeStamp,
false)
- val operationContext = new OperationContext()
- val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
- new LoadTablePreStatusUpdateEvent(
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
- carbonLoadModel)
- OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
- val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
- new LoadTablePostStatusUpdateEvent(carbonLoadModel)
- OperationListenerBus.getInstance()
- .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
// convert a streaming segment to columnar segment
val status = new StreamHandoffRDD(
sparkSession.sparkContext,
@@ -355,7 +344,19 @@ object StreamHandoffRDD {
}
if (loadStatus == SegmentStatus.SUCCESS) {
+ val operationContext = new OperationContext()
+ val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+ new LoadTablePreStatusUpdateEvent(
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
+ carbonLoadModel)
+ OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
+
+ val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+ new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+ OperationListenerBus.getInstance()
+ .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
if (!done) {
val errorMessage = "Handoff failed due to failure in table status updation."
LOGGER.audit("Handoff is failed for " +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index b0e6b94..b583c6a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -68,7 +69,18 @@ case class CarbonAlterTableAddHivePartitionCommand(
currParts.exists(p => part.equals(p))
}.asJava)
}
+ val operationContext = new OperationContext
+ val preAlterTableHivePartitionCommandEvent = new PreAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists).run(sparkSession)
+ val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
}
Seq.empty[Row]
}
@@ -113,7 +125,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
newMetaEntry.setSegmentFile(segmentFileName)
val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
- CarbonUtil.checkAndCreateFolder(segmentsLoc)
+ CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
CarbonLoaderUtil.populateNewLoadMetaEntry(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 407057e..c67d694 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
/**
@@ -89,6 +90,12 @@ case class CarbonAlterTableDropHivePartitionCommand(
partition.location)
}
carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava)
+ val operationContext = new OperationContext
+ val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
// Drop the partitions from hive.
AlterTableDropPartitionCommand(
tableName,
@@ -96,6 +103,11 @@ case class CarbonAlterTableDropHivePartitionCommand(
ifExists,
purge,
retainData).run(sparkSession)
+ val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
} catch {
case e: Exception =>
if (!ifExists) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 4b89296..1bdf414 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.partition.SplitPartitionCallable
@@ -151,12 +152,23 @@ case class CarbonAlterTableSplitPartitionCommand(
carbonLoadModel.setTablePath(tablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
+ val operationContext = new OperationContext
+ val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
alterTableSplitPartition(
sparkSession.sqlContext,
splitPartitionModel.partitionId.toInt.toString,
carbonLoadModel,
oldPartitionIds.asScala.toList
)
+ val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
success = true
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 6664a2c..724a312 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -95,8 +95,9 @@ public class MeasureFieldConverterImpl implements FieldConverter {
}
row.update(output, index);
} catch (NumberFormatException e) {
- LOGGER.warn(
- "Cant not convert value to Numeric type value. Value considered as null.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Can not convert value to Numeric type value. Value considered as null.");
+ }
logHolder.setReason(
CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
output = null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 8aa5bde..1d892e0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -308,7 +309,7 @@ public class CarbonFactDataHandlerModel {
measureDataTypes[i++] = msr.getDataType();
}
carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
- CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+ CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath);
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -337,9 +338,32 @@ public class CarbonFactDataHandlerModel {
* @return data directory path
*/
private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
+ // configuration.getDataWritePath will not be null only in case of partition
if (configuration.getDataWritePath() != null) {
- CarbonUtil.checkAndCreateFolder(configuration.getDataWritePath());
- return configuration.getDataWritePath();
+ String paths = configuration.getDataWritePath();
+ AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
+ String partPath = absoluteTableIdentifier.getTablePath();
+ String[] dirs = paths.split(partPath);
+ /* it will create folder one by one and apply the permissions
+ else creation of folder in one go will set the permission for last directory only
+ e.g. paths="/home/rahul/Documents/store/carbonTable1/emp_name=rahul/loc=india/dept=rd"
+ So, dirs={"","/emp_name=rahul/loc=india/dept=rd"}
+ if (dirs.length > 1) then partDirs ={"","emp_name=rahul","loc=india","dept=rd"}
+ forEach partDirs partpath(say "/home/rahul/Documents/store/carbonTable1") will
+ be keep appending with "emp_name=rahul","loc=india","dept=rd" sequentially
+ */
+ if (dirs.length > 1) {
+ String[] partDirs = dirs[1].split(CarbonCommonConstants.FILE_SEPARATOR);
+ for (String partDir : partDirs) {
+ if (!partDir.isEmpty()) {
+ partPath = partPath.concat(CarbonCommonConstants.FILE_SEPARATOR + partDir);
+ CarbonUtil.checkAndCreateFolderWithPermission(partPath);
+ }
+ }
+ } else {
+ CarbonUtil.checkAndCreateFolderWithPermission(paths);
+ }
+ return paths;
}
AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
String carbonDataDirectoryPath = CarbonTablePath