You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/03/30 14:40:28 UTC
[2/2] carbondata git commit: [CARBONDATA-2287] events added for alter
hive partition table
[CARBONDATA-2287] events added for alter hive partition table
This closes #2119
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c1035307
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c1035307
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c1035307
Branch: refs/heads/branch-1.3
Commit: c10353070136aa8d06c1995100ae0505361c9b43
Parents: 4b07db4
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue Mar 27 12:20:04 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Fri Mar 30 20:13:02 2018 +0530
----------------------------------------------------------------------
.../filesystem/AbstractDFSCarbonFile.java | 3 ++
.../carbondata/core/util/CarbonProperties.java | 56 ++++++++++----------
.../apache/carbondata/core/util/CarbonUtil.java | 23 ++++++++
.../carbondata/events/AlterTableEvents.scala | 16 ++++++
.../org/apache/carbondata/events/Events.scala | 10 +++-
...arbonAlterTableAddHivePartitionCommand.scala | 14 ++++-
...rbonAlterTableDropHivePartitionCommand.scala | 12 +++++
.../CarbonAlterTableSplitPartitionCommand.scala | 16 +++++-
.../impl/MeasureFieldConverterImpl.java | 5 +-
.../store/CarbonFactDataHandlerModel.java | 30 +++++++++--
.../carbondata/streaming/StreamHandoffRDD.scala | 23 ++++----
11 files changed, 160 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 68eaa21..a4a92ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -467,6 +467,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
fs.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false,
fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(path),
fs.getDefaultBlockSize(path), null).close();
+ // haddop masks the permission accoding to configured permission, so need to set permission
+ // forcefully
+ fs.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
return true;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a2ace69..e90a644 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -723,17 +723,17 @@ public final class CarbonProperties {
carbonProperties.load(fis);
}
} catch (FileNotFoundException e) {
- LOGGER.error(
+ LOGGER.warn(
"The file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH + " does not exist");
} catch (IOException e) {
- LOGGER.error(
+ LOGGER.warn(
"Error while reading the file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
} finally {
if (null != fis) {
try {
fis.close();
} catch (IOException e) {
- LOGGER.error("Error while closing the file stream for file: "
+ LOGGER.warn("Error while closing the file stream for file: "
+ CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
}
}
@@ -743,7 +743,7 @@ public final class CarbonProperties {
try {
initPropertySet();
} catch (IllegalAccessException e) {
- LOGGER.error("Illegal access to declared field" + e.getMessage());
+ LOGGER.warn("Illegal access to declared field" + e.getMessage());
}
}
@@ -872,7 +872,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER));
// checking min and max . 0 , 100 is min & max.
if (numberOfSegmentsToBePreserved < 0 || numberOfSegmentsToBePreserved > 100) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER + " is incorrect."
+ " Correct value should be in range of 0 -100. Taking the default value.");
numberOfSegmentsToBePreserved =
@@ -928,7 +928,7 @@ public final class CarbonProperties {
}
compactionSize[i++] = size;
} catch (NumberFormatException e) {
- LOGGER.error(
+ LOGGER.warn(
"Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD
+ " is not proper. Taking the default value "
+ CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
@@ -949,7 +949,7 @@ public final class CarbonProperties {
numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.NUM_CORES_LOADING));
} catch (NumberFormatException exc) {
- LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+ LOGGER.warn("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+ " is wrong. Falling back to the default value "
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
@@ -970,18 +970,18 @@ public final class CarbonProperties {
} catch (Exception e) {
inMemoryChunkSizeInMB =
Integer.parseInt(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT);
- LOGGER.error("Problem in parsing the sort memory chunk size, setting with default value"
+ LOGGER.warn("Problem in parsing the sort memory chunk size, setting with default value"
+ inMemoryChunkSizeInMB);
}
if (inMemoryChunkSizeInMB > 1024) {
inMemoryChunkSizeInMB = 1024;
- LOGGER.error(
+ LOGGER.warn(
"It is not recommended to increase the sort memory chunk size more than 1024MB, "
+ "so setting the value to "
+ inMemoryChunkSizeInMB);
} else if (inMemoryChunkSizeInMB < 1) {
inMemoryChunkSizeInMB = 1;
- LOGGER.error(
+ LOGGER.warn(
"It is not recommended to decrease the sort memory chunk size less than 1MB, "
+ "so setting the value to "
+ inMemoryChunkSizeInMB);
@@ -1067,7 +1067,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+ "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
@@ -1075,7 +1075,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
}
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
@@ -1097,7 +1097,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+ "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
@@ -1105,7 +1105,7 @@ public final class CarbonProperties {
CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
}
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
@@ -1123,7 +1123,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
if (!validateBoolean) {
- LOGGER.error("The carbon.use.multiple.temp.dir configuration value is invalid."
+ LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is invalid."
+ "Configured value: \"" + usingMultiDirStr + "\"."
+ "Data Load will not use multiple temp directories.");
usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
@@ -1140,7 +1140,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT);
boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
if (!validateStorageLevel) {
- LOGGER.error("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL
+ LOGGER.warn("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL
+ " configuration value is invalid. It will use default storage level("
+ CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT
+ ") to persist rdd.");
@@ -1169,7 +1169,7 @@ public final class CarbonProperties {
}
if (isInvalidValue) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM
+ " is incorrect. Correct value should be in range of 0 - 1000."
+ " Taking the default value: "
@@ -1190,7 +1190,7 @@ public final class CarbonProperties {
CarbonCommonConstants.defaultValueIsPersistEnabled);
boolean validatePersistEnabled = CarbonUtil.validateBoolean(isPersistEnabled);
if (!validatePersistEnabled) {
- LOGGER.error("The " + CarbonCommonConstants.isPersistEnabled
+ LOGGER.warn("The " + CarbonCommonConstants.isPersistEnabled
+ " configuration value is invalid. It will use default value("
+ CarbonCommonConstants.defaultValueIsPersistEnabled
+ ").");
@@ -1208,7 +1208,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT);
boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
if (!validateStorageLevel) {
- LOGGER.error("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
+ LOGGER.warn("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
+ " configuration value is invalid. It will use default storage level("
+ CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT
+ ") to persist dataset.");
@@ -1228,7 +1228,7 @@ public final class CarbonProperties {
|| "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
return compressor;
} else {
- LOGGER.error("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
+ LOGGER.warn("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
.concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 and")
.concat(" empty are allowed. It will not compress the sort temp files by default"));
return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
@@ -1264,14 +1264,14 @@ public final class CarbonProperties {
sortMemorySizeInMB = Integer.parseInt(
carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB));
} catch (NumberFormatException e) {
- LOGGER.error(
+ LOGGER.warn(
"The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
+ "is Invalid." + " Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
sortMemorySizeInMB = sortMemorySizeInMBDefault;
}
if (sortMemorySizeInMB < sortMemorySizeInMBDefault) {
- LOGGER.error(
+ LOGGER.warn(
"The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
+ "is less than default value." + ". Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
@@ -1309,14 +1309,14 @@ public final class CarbonProperties {
unsafeWorkingMemory = Integer.parseInt(
carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB));
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid."
+ " Taking the default value."
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
unsafeWorkingMemory = unsafeWorkingMemoryDefault;
}
if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT
+ "is less than the default value." + ". Taking the default value."
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
@@ -1334,14 +1334,14 @@ public final class CarbonProperties {
unsafeSortStorageMemory = Integer.parseInt(carbonProperties
.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
} catch (NumberFormatException e) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid."
+ " Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
unsafeSortStorageMemory = unsafeSortStorageMemoryDefault;
}
if (unsafeSortStorageMemory < unsafeSortStorageMemoryDefault) {
- LOGGER.error("The specified value for property "
+ LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB
+ "is less than the default value." + " Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
@@ -1361,7 +1361,7 @@ public final class CarbonProperties {
.getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES,
CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT));
} catch (NumberFormatException exc) {
- LOGGER.error(
+ LOGGER.warn(
"The heap memory pooling threshold bytes is invalid. Using the default value "
+ CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
thresholdSize = Integer.parseInt(
@@ -1382,7 +1382,7 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT));
preserveSeconds = preserveHours * 3600 * 1000L;
} catch (NumberFormatException exc) {
- LOGGER.error(
+ LOGGER.warn(
"The segment lock files preserv hours is invalid. Using the default value "
+ CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT);
preserveSeconds = Integer.parseInt(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 7f21229..cfa94f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -105,6 +105,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
@@ -846,6 +848,27 @@ public final class CarbonUtil {
}
/**
+ * This method will check and create the given path with 777 permission
+ */
+ public static boolean checkAndCreateFolderWithPermission(String path) {
+ boolean created = false;
+ try {
+ FileFactory.FileType fileType = FileFactory.getFileType(path);
+ if (FileFactory.isFileExist(path, fileType)) {
+ created = true;
+ } else {
+ FileFactory.createDirectoryAndSetPermission(path,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ created = true;
+ }
+ } catch (IOException e) {
+ LOGGER.error(e);
+ }
+ return created;
+ }
+
+
+ /**
* This method will return the size of a given file
*/
public static long getFileSize(String filePath) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 538df4a..7c4339f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -215,3 +215,19 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo
+
+/**
+ * pre event for standard hive partition
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class PreAlterTableHivePartitionCommandEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo
+
+/**
+ * post event for standard hive partition
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class PostAlterTableHivePartitionCommandEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 799d8c4..d85e8ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -100,7 +100,7 @@ trait AlterTableCompactionStatusUpdateEventInfo {
}
/**
- * event for alter_table_compaction
+ * event info for alter_table_compaction
*/
trait AlterTableCompactionEventInfo {
val sparkSession: SparkSession
@@ -108,6 +108,14 @@ trait AlterTableCompactionEventInfo {
}
/**
+ * event for alter table standard hive partition
+ */
+trait AlterTableHivePartitionInfo {
+ val sparkSession: SparkSession
+ val carbonTable: CarbonTable
+}
+
+/**
* event for DeleteSegmentById
*/
trait DeleteSegmentbyIdEventInfo {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index b0e6b94..b583c6a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -68,7 +69,18 @@ case class CarbonAlterTableAddHivePartitionCommand(
currParts.exists(p => part.equals(p))
}.asJava)
}
+ val operationContext = new OperationContext
+ val preAlterTableHivePartitionCommandEvent = new PreAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists).run(sparkSession)
+ val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
}
Seq.empty[Row]
}
@@ -113,7 +125,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
newMetaEntry.setSegmentFile(segmentFileName)
val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
- CarbonUtil.checkAndCreateFolder(segmentsLoc)
+ CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
CarbonLoaderUtil.populateNewLoadMetaEntry(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 407057e..c67d694 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
/**
@@ -89,6 +90,12 @@ case class CarbonAlterTableDropHivePartitionCommand(
partition.location)
}
carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava)
+ val operationContext = new OperationContext
+ val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
// Drop the partitions from hive.
AlterTableDropPartitionCommand(
tableName,
@@ -96,6 +103,11 @@ case class CarbonAlterTableDropHivePartitionCommand(
ifExists,
purge,
retainData).run(sparkSession)
+ val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
} catch {
case e: Exception =>
if (!ifExists) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 78cf2b8..914b39a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.command.partition
import java.text.SimpleDateFormat
import java.util
-import java.util.concurrent.{Executors, ExecutorService, Future}
+import java.util.concurrent.{ExecutorService, Executors, Future}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.CarbonRelation
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.partition.SplitPartitionCallable
@@ -153,12 +154,23 @@ case class CarbonAlterTableSplitPartitionCommand(
carbonLoadModel.setTablePath(tablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
+ val operationContext = new OperationContext
+ val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
alterTableSplitPartition(
sparkSession.sqlContext,
splitPartitionModel.partitionId.toInt.toString,
carbonLoadModel,
oldPartitionIds.asScala.toList
)
+ val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+ sparkSession,
+ table)
+ OperationListenerBus.getInstance()
+ .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
success = true
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 6664a2c..724a312 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -95,8 +95,9 @@ public class MeasureFieldConverterImpl implements FieldConverter {
}
row.update(output, index);
} catch (NumberFormatException e) {
- LOGGER.warn(
- "Cant not convert value to Numeric type value. Value considered as null.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Can not convert value to Numeric type value. Value considered as null.");
+ }
logHolder.setReason(
CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
output = null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d77fcab..bdeb465 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -307,7 +308,7 @@ public class CarbonFactDataHandlerModel {
measureDataTypes[i++] = msr.getDataType();
}
carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
- CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+ CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath);
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -332,9 +333,32 @@ public class CarbonFactDataHandlerModel {
* @return data directory path
*/
private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
+ // configuration.getDataWritePath will not be null only in case of partition
if (configuration.getDataWritePath() != null) {
- CarbonUtil.checkAndCreateFolder(configuration.getDataWritePath());
- return configuration.getDataWritePath();
+ String paths = configuration.getDataWritePath();
+ AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
+ String partPath = absoluteTableIdentifier.getTablePath();
+ String[] dirs = paths.split(partPath);
+ /* it will create folder one by one and apply the permissions
+ else creation of folder in one go will set the permission for last directory only
+ e.g. paths="/home/rahul/Documents/store/carbonTable1/emp_name=rahul/loc=india/dept=rd"
+ So, dirs={"","/emp_name=rahul/loc=india/dept=rd"}
+ if (dirs.length > 1) then partDirs ={"","emp_name=rahul","loc=india","dept=rd"}
+ forEach partDirs partpath(say "/home/rahul/Documents/store/carbonTable1") will
+ be keep appending with "emp_name=rahul","loc=india","dept=rd" sequentially
+ */
+ if (dirs.length > 1) {
+ String[] partDirs = dirs[1].split(CarbonCommonConstants.FILE_SEPARATOR);
+ for (String partDir : partDirs) {
+ if (!partDir.isEmpty()) {
+ partPath = partPath.concat(CarbonCommonConstants.FILE_SEPARATOR + partDir);
+ CarbonUtil.checkAndCreateFolderWithPermission(partPath);
+ }
+ }
+ } else {
+ CarbonUtil.checkAndCreateFolderWithPermission(paths);
+ }
+ return paths;
}
AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1035307/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index a46ced5..366a1ba 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -314,18 +314,7 @@ object StreamHandoffRDD {
SegmentStatus.INSERT_IN_PROGRESS,
carbonLoadModel.getFactTimeStamp,
false)
- val operationContext = new OperationContext()
- val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
- new LoadTablePreStatusUpdateEvent(
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
- carbonLoadModel)
- OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
- val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
- new LoadTablePostStatusUpdateEvent(carbonLoadModel)
- OperationListenerBus.getInstance()
- .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
// convert a streaming segment to columnar segment
val status = new StreamHandoffRDD(
sparkSession.sparkContext,
@@ -358,7 +347,19 @@ object StreamHandoffRDD {
}
if (loadStatus == SegmentStatus.SUCCESS) {
+ val operationContext = new OperationContext()
+ val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+ new LoadTablePreStatusUpdateEvent(
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
+ carbonLoadModel)
+ OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
+
+ val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+ new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+ OperationListenerBus.getInstance()
+ .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
if (!done) {
val errorMessage = "Handoff failed due to failure in table status updation."
LOGGER.audit("Handoff is failed for " +