You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:19 UTC
[12/50] [abbrv] carbondata git commit: [CARBONDATA-2012] Add support
to load pre-aggregate in one transaction
[CARBONDATA-2012] Add support to load pre-aggregate in one transaction
Current if a table(t1) has 2 preaggregate table(p1,p2) then while loading all the pre-aggregate tables are committed(table status writing) and then the parent table is committed.
After this PR the flow would be like this:
load t1
load p1
load p2
write table status for p2 with transactionID
write table status for p1 with transactionID
rename tablestatus_UUID to tablestatus for p2
rename tablestatus_UUID to tablestatus for p1
write table status for t1
This closes #1781
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d680e9cf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d680e9cf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d680e9cf
Branch: refs/heads/branch-1.3
Commit: d680e9cf5016475e6e9b320c27be6503e1c6e66c
Parents: c9a02fc
Author: kunal642 <ku...@gmail.com>
Authored: Mon Jan 15 14:35:56 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Feb 1 14:42:05 2018 +0530
----------------------------------------------------------------------
.../datastore/filesystem/LocalCarbonFile.java | 2 +-
.../statusmanager/SegmentStatusManager.java | 29 ++-
.../core/util/path/CarbonTablePath.java | 8 +
.../hadoop/api/CarbonOutputCommitter.java | 4 +
.../carbondata/events/AlterTableEvents.scala | 10 +
.../spark/rdd/AggregateDataMapCompactor.scala | 31 ++-
.../spark/rdd/CarbonDataRDDFactory.scala | 37 +++-
.../spark/rdd/CarbonTableCompactor.scala | 33 ++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 4 +-
.../management/CarbonLoadDataCommand.scala | 25 ++-
.../CreatePreAggregateTableCommand.scala | 7 +-
.../preaaggregate/PreAggregateListeners.scala | 220 +++++++++++++++++--
.../preaaggregate/PreAggregateUtil.scala | 35 +--
.../processing/loading/events/LoadEvents.java | 13 ++
.../processing/util/CarbonLoaderUtil.java | 49 ++++-
15 files changed, 431 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 4ce78be..5df5a81 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -233,7 +233,7 @@ public class LocalCarbonFile implements CarbonFile {
@Override public boolean renameForce(String changetoName) {
File destFile = new File(changetoName);
- if (destFile.exists()) {
+ if (destFile.exists() && !file.getAbsolutePath().equals(destFile.getAbsolutePath())) {
if (destFile.delete()) {
return file.renameTo(new File(changetoName));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 6af0304..01f810e 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -178,23 +178,42 @@ public class SegmentStatusManager {
* @return
*/
public static LoadMetadataDetails[] readLoadMetadata(String metadataFolderPath) {
+ String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.LOADMETADATA_FILENAME;
+ return readTableStatusFile(metadataFileName);
+ }
+
+ /**
+ * Reads the table status file with the specified UUID if non empty.
+ */
+ public static LoadMetadataDetails[] readLoadMetadata(String metaDataFolderPath, String uuid) {
+ String tableStatusFileName;
+ if (uuid.isEmpty()) {
+ tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.LOADMETADATA_FILENAME;
+ } else {
+ tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.LOADMETADATA_FILENAME + CarbonCommonConstants.UNDERSCORE + uuid;
+ }
+ return readTableStatusFile(tableStatusFileName);
+ }
+
+ public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath) {
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
InputStreamReader inStream = null;
- String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.LOADMETADATA_FILENAME;
LoadMetadataDetails[] listOfLoadFolderDetailsArray;
AtomicFileOperations fileOperation =
- new AtomicFileOperationsImpl(metadataFileName, FileFactory.getFileType(metadataFileName));
+ new AtomicFileOperationsImpl(tableStatusPath, FileFactory.getFileType(tableStatusPath));
try {
- if (!FileFactory.isFileExist(metadataFileName, FileFactory.getFileType(metadataFileName))) {
+ if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
return new LoadMetadataDetails[0];
}
dataInputStream = fileOperation.openForRead();
inStream = new InputStreamReader(dataInputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
buffReader = new BufferedReader(inStream);
listOfLoadFolderDetailsArray =
gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 9e66657..fab6289 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -252,6 +252,14 @@ public class CarbonTablePath extends Path {
return getMetaDataDir() + File.separator + TABLE_STATUS_FILE;
}
+ public String getTableStatusFilePathWithUUID(String uuid) {
+ if (!uuid.isEmpty()) {
+ return getTableStatusFilePath() + CarbonCommonConstants.UNDERSCORE + uuid;
+ } else {
+ return getTableStatusFilePath();
+ }
+ }
+
/**
* Gets absolute path of data file
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index f6e928d..9cca1bb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -115,8 +115,12 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
LoadEvents.LoadTablePreStatusUpdateEvent event =
new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
loadModel);
+ LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
+ new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);
try {
OperationListenerBus.getInstance().fireEvent(event, (OperationContext) operationContext);
+ OperationListenerBus.getInstance().fireEvent(postStatusUpdateEvent,
+ (OperationContext) operationContext);
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 30e3f6f..ca1948a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -182,6 +182,16 @@ case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession,
mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo
/**
+ * Compaction Event for handling post update status file operations, like committing child
+ * datamaps in one transaction
+ */
+case class AlterTableCompactionPostStatusUpdateEvent(
+ carbonTable: CarbonTable,
+ carbonMergerMapping: CarbonMergerMapping,
+ carbonLoadModel: CarbonLoadModel,
+ mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo
+
+/**
* Compaction Event for handling clean up in case of any compaction failure and abort the
* operation, lister has to implement this event to handle failure scenarios
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 5f8f389..188e776 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.OperationContext
@@ -61,6 +62,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
val loadCommand = operationContext.getProperty(carbonTable.getTableName + "_Compaction")
.asInstanceOf[CarbonLoadDataCommand]
+ val uuid = Option(loadCommand.operationContext.getProperty("uuid")).getOrElse("").toString
try {
val newInternalOptions = loadCommand.internalOptions ++
Map("mergedSegmentName" -> mergedLoadName)
@@ -70,7 +72,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
sqlContext.sparkSession, loadCommand.logicalPlan.get))
loadCommand.processData(sqlContext.sparkSession)
val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
- carbonTable.getMetaDataFilepath)
+ carbonTable.getMetaDataFilepath, uuid)
val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
case load if loadMetaDataDetails.contains(load) =>
load.setMergedLoadName(mergedLoadName)
@@ -83,16 +85,37 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
.getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
.getAbsoluteTableIdentifier)
SegmentStatusManager
- .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
+ .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePathWithUUID(uuid),
updatedLoadMetaDataDetails)
carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
} finally {
// check if any other segments needs compaction on in case of MINOR_COMPACTION.
// For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
// allows it.
+ // Also as the load which will be fired for 2nd level compaction will read the
+ // tablestatus file and not the tablestatus_UUID therefore we have to commit the
+ // intermediate tablestatus file for 2nd level compaction to be successful.
+ // This is required because:
+ // 1. after doing 12 loads and a compaction after every 4 loads the table status file will
+ // have 0.1, 4.1, 8, 9, 10, 11 as Success segments. While tablestatus_UUID will have
+ // 0.1, 4.1, 8.1.
+ // 2. Now for 2nd level compaction 0.1, 8.1, 4.1 have to be merged to 0.2. therefore we
+ // need to read the tablestatus_UUID. But load flow should always read tablestatus file
+ // because it contains the actual In-Process status for the segments.
+ // 3. If we read the tablestatus then 8, 9, 10, 11 will keep getting compacted into 8.1.
+ // 4. Therefore tablestatus file will be committed in between multiple commits.
if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
-
- executeCompaction()
+ if (!identifySegmentsToBeMerged().isEmpty) {
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ .getAbsoluteTableIdentifier)
+ val uuidTableStaus = carbonTablePath.getTableStatusFilePathWithUUID(uuid)
+ val tableStatus = carbonTablePath.getTableStatusFilePath
+ if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) {
+ FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus)
+ }
+ executeCompaction()
+ }
}
CarbonSession
.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8212e85..3de0e70 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -39,6 +39,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
@@ -62,7 +63,7 @@ import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.loading.sort.SortScopeOptions
@@ -491,9 +492,10 @@ object CarbonDataRDDFactory {
}
return
}
+ val uniqueTableStatusId = operationContext.getProperty("uuid").asInstanceOf[String]
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
// update the load entry in table status file for changing the status to marked for delete
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
@@ -508,7 +510,7 @@ object CarbonDataRDDFactory {
status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
// update the load entry in table status file for changing the status to marked for delete
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
@@ -532,6 +534,8 @@ object CarbonDataRDDFactory {
}
writeDictionary(carbonLoadModel, result, writeAll = false)
+ operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
+ carbonLoadModel.getSegmentId)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
new LoadTablePreStatusUpdateEvent(
carbonTable.getCarbonTableIdentifier,
@@ -543,9 +547,21 @@ object CarbonDataRDDFactory {
carbonLoadModel,
loadStatus,
newEntryLoadStatus,
- overwriteTable)
- if (!done) {
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+ overwriteTable,
+ uniqueTableStatusId)
+ val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+ new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+ val commitComplete = try {
+ OperationListenerBus.getInstance()
+ .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+ true
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, "Problem while committing data maps")
+ false
+ }
+ if (!done && !commitComplete) {
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
@@ -731,7 +747,8 @@ object CarbonDataRDDFactory {
operationContext: OperationContext): Unit = {
LOGGER.info(s"compaction need status is" +
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
- if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
+ if (!carbonTable.isChildDataMap &&
+ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
LOGGER.audit(s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val compactionSize = 0
@@ -805,7 +822,8 @@ object CarbonDataRDDFactory {
carbonLoadModel: CarbonLoadModel,
loadStatus: SegmentStatus,
newEntryLoadStatus: SegmentStatus,
- overwriteTable: Boolean): Boolean = {
+ overwriteTable: Boolean,
+ uuid: String = ""): Boolean = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
status(0)._2._1
@@ -820,7 +838,7 @@ object CarbonDataRDDFactory {
CarbonLoaderUtil
.addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
- overwriteTable)
+ overwriteTable, uuid)
if (!done) {
val errorMessage = s"Dataload failed due to failure in table status updation for" +
s" ${carbonLoadModel.getTableName}"
@@ -835,7 +853,6 @@ object CarbonDataRDDFactory {
done
}
-
/**
* repartition the input data for partition table.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a0c8f65..8406d8d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -32,10 +32,12 @@ import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events._
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER
import org.apache.carbondata.spark.util.CommonUtil
/**
@@ -245,8 +247,33 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
CarbonDataMergerUtil
.updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
mergedLoadNumber, carbonLoadModel, compactionType)
-
- if (!statusFileUpdation) {
+ val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable,
+ carbonMergerMapping,
+ carbonLoadModel,
+ mergedLoadName)
+ // Used to inform the commit listener that the commit is fired from compaction flow.
+ operationContext.setProperty("isCompaction", "true")
+ val commitComplete = try {
+ // Once main table compaction is done and 0.1, 4.1, 8.1 is created commit will happen for
+ // all the tables. The commit listener will compact the child tables until no more segments
+ // are left. But 2nd level compaction is yet to happen on the main table therefore again the
+ // compaction flow will try to commit the child tables which is wrong. This check tell the
+ // 2nd level compaction flow that the commit for datamaps is already done.
+ val isCommitDone = operationContext.getProperty("commitComplete")
+ if (isCommitDone != null) {
+ isCommitDone.toString.toBoolean
+ } else {
+ OperationListenerBus.getInstance()
+ .fireEvent(compactionLoadStatusPostEvent, operationContext)
+ true
+ }
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, "Problem while committing data maps")
+ false
+ }
+ operationContext.setProperty("commitComplete", commitComplete)
+ if (!statusFileUpdation && !commitComplete) {
LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 870b1f3..40035ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util._
import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -148,6 +148,8 @@ object CarbonEnv {
AlterPreAggregateTableCompactionPostListener)
.addListener(classOf[LoadMetadataEvent], LoadProcessMetaListener)
.addListener(classOf[LoadMetadataEvent], CompactionProcessMetaListener)
+ .addListener(classOf[LoadTablePostStatusUpdateEvent], CommitPreAggregateListener)
+ .addListener(classOf[AlterTableCompactionPostStatusUpdateEvent], CommitPreAggregateListener)
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 226a625..8e6c20e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.management
import java.text.SimpleDateFormat
import java.util
+import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -35,8 +36,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
@@ -119,6 +120,7 @@ case class CarbonLoadDataCommand(
}
Seq.empty
}
+
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
@@ -176,7 +178,18 @@ case class CarbonLoadDataCommand(
LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
var isUpdateTableStatusRequired = false
+ // if the table is child then extract the uuid from the operation context and the parent would
+ // already generated UUID.
+ // if parent table then generate a new UUID else use empty.
+ val uuid = if (table.isChildDataMap) {
+ Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ } else if (table.hasAggregationDataMap) {
+ UUID.randomUUID().toString
+ } else {
+ ""
+ }
try {
+ operationContext.setProperty("uuid", uuid)
val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
new LoadTablePreExecutionEvent(
table.getCarbonTableIdentifier,
@@ -194,7 +207,9 @@ case class CarbonLoadDataCommand(
DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
// add the start entry for the new load in the table status file
if (updateModel.isEmpty && !table.isHivePartitionTable) {
- CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
+ CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+ carbonLoadModel,
+ isOverwriteTable)
isUpdateTableStatusRequired = true
}
if (isOverwriteTable) {
@@ -252,7 +267,7 @@ case class CarbonLoadDataCommand(
case CausedBy(ex: NoRetryException) =>
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
@@ -263,7 +278,7 @@ case class CarbonLoadDataCommand(
LOGGER.error(ex)
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
throw ex
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index dbbf90c..3de75c2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -205,11 +205,12 @@ case class CreatePreAggregateTableCommand(
loadCommand.dataFrame = Some(PreAggregateUtil
.getDataFrame(sparkSession, loadCommand.logicalPlan.get))
PreAggregateUtil.startDataLoadForDataMap(
- parentTable,
+ TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)),
segmentToLoad = "*",
validateSegments = true,
- sparkSession,
- loadCommand)
+ loadCommand,
+ isOverwrite = false,
+ sparkSession)
}
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 7b273ba..ed6be97 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -17,19 +17,26 @@
package org.apache.spark.sql.execution.command.preaaggregate
+import java.util.UUID
+
import scala.collection.JavaConverters._
import scala.collection.mutable
-import org.apache.spark.sql.{SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.AlterTableModel
import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
/**
* below class will be used to create load command for compaction
@@ -71,9 +78,13 @@ object CompactionProcessMetaListener extends OperationEventListener {
childDataFrame,
false,
sparkSession)
+ val uuid = Option(operationContext.getProperty("uuid")).
+ getOrElse(UUID.randomUUID()).toString
+ operationContext.setProperty("uuid", uuid)
loadCommand.processMetadata(sparkSession)
operationContext
.setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand)
+ loadCommand.operationContext = operationContext
}
} else if (table.isChildDataMap) {
val childTableName = table.getTableName
@@ -95,9 +106,13 @@ object CompactionProcessMetaListener extends OperationEventListener {
childDataFrame,
false,
sparkSession)
+ val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
loadCommand.processMetadata(sparkSession)
operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
+ operationContext.setProperty("uuid", uuid)
+ loadCommand.operationContext = operationContext
}
+
}
}
@@ -127,12 +142,17 @@ object LoadProcessMetaListener extends OperationEventListener {
val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
val parentTableName = table.getTableName
val databaseName = table.getDatabaseName
+ // if the table is child then extract the uuid from the operation context and the parent
+ // would already generated UUID.
+ // if parent table then generate a new UUID else use empty.
+ val uuid =
+ Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
val childTableName = dataMapSchema.getRelationIdentifier.getTableName
val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) {
- PreAggregateUtil.getChildQuery(dataMapSchema)
+ (PreAggregateUtil.getChildQuery(dataMapSchema), "")
} else {
// for timeseries rollup policy
val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
@@ -140,18 +160,19 @@ object LoadProcessMetaListener extends OperationEventListener {
list += dataMapSchema
// if non of the rollup data map is selected hit the maintable and prepare query
if (tableSelectedForRollup.isEmpty) {
- PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
+ (PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
parentTableName,
- databaseName)
+ databaseName), "")
} else {
// otherwise hit the select rollup datamap schema
- PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
+ (PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
tableSelectedForRollup.get,
- databaseName)
+ databaseName),
+ s"$databaseName.${tableSelectedForRollup.get.getChildSchema.getTableName}")
}
}
val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
- childSelectQuery)).drop("preAggLoad")
+ childSelectQuery._1)).drop("preAggLoad")
val isOverwrite =
operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
val loadCommand = PreAggregateUtil.createLoadCommandForChild(
@@ -159,7 +180,10 @@ object LoadProcessMetaListener extends OperationEventListener {
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
isOverwrite,
- sparkSession)
+ sparkSession,
+ timeseriesParentTableName = childSelectQuery._2)
+ operationContext.setProperty("uuid", uuid)
+ loadCommand.operationContext.setProperty("uuid", uuid)
loadCommand.processMetadata(sparkSession)
operationContext.setProperty(dataMapSchema.getChildSchema.getTableName, loadCommand)
}
@@ -191,25 +215,172 @@ object LoadPostAggregateListener extends OperationEventListener {
.asInstanceOf[CarbonLoadDataCommand]
childLoadCommand.dataFrame = Some(PreAggregateUtil
.getDataFrame(sparkSession, childLoadCommand.logicalPlan.get))
- val childOperationContext = new OperationContext
- childOperationContext
- .setProperty(dataMapSchema.getChildSchema.getTableName,
- operationContext.getProperty(dataMapSchema.getChildSchema.getTableName))
val isOverwrite =
operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
- childOperationContext.setProperty("isOverwrite", isOverwrite)
- childOperationContext.setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction",
- operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction"))
- childLoadCommand.operationContext = childOperationContext
+ childLoadCommand.operationContext = operationContext
+ val timeseriesParent = childLoadCommand.internalOptions.get("timeseriesParent")
+ val (parentTableIdentifier, segmentToLoad) =
+ if (timeseriesParent.isDefined && timeseriesParent.get.nonEmpty) {
+ val (parentTableDatabase, parentTableName) =
+ (timeseriesParent.get.split('.')(0), timeseriesParent.get.split('.')(1))
+ (TableIdentifier(parentTableName, Some(parentTableDatabase)),
+ operationContext.getProperty(
+ s"${parentTableDatabase}_${parentTableName}_Segment").toString)
+ } else {
+ (TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
+ carbonLoadModel.getSegmentId)
+ }
PreAggregateUtil.startDataLoadForDataMap(
- table,
- carbonLoadModel.getSegmentId,
+ parentTableIdentifier,
+ segmentToLoad,
validateSegments = false,
- sparkSession,
- childLoadCommand)
+ childLoadCommand,
+ isOverwrite,
+ sparkSession)
+ }
+ }
+ }
+}
+
+/**
+ * This listener is used to commit all the child data aggregate tables in one transaction. If one
+ * failes all will be reverted to original state.
+ */
+object CommitPreAggregateListener extends OperationEventListener {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext): Unit = {
+ // The same listener is called for both compaction and load therefore getting the
+ // carbonLoadModel from the appropriate event.
+ val carbonLoadModel = event match {
+ case loadEvent: LoadTablePostStatusUpdateEvent =>
+ loadEvent.getCarbonLoadModel
+ case compactionEvent: AlterTableCompactionPostStatusUpdateEvent =>
+ compactionEvent.carbonLoadModel
+ }
+ val isCompactionFlow = Option(
+ operationContext.getProperty("isCompaction")).getOrElse("false").toString.toBoolean
+ val dataMapSchemas =
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getDataMapSchemaList
+ // extract all child LoadCommands
+ val childLoadCommands = if (!isCompactionFlow) {
+ // If not compaction flow then the key for load commands will be tableName
+ dataMapSchemas.asScala.map { dataMapSchema =>
+ operationContext.getProperty(dataMapSchema.getChildSchema.getTableName)
+ .asInstanceOf[CarbonLoadDataCommand]
+ }
+ } else {
+ // If not compaction flow then the key for load commands will be tableName_Compaction
+ dataMapSchemas.asScala.map { dataMapSchema =>
+ operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")
+ .asInstanceOf[CarbonLoadDataCommand]
+ }
+ }
+ if (dataMapSchemas.size() > 0) {
+ val uuid = operationContext.getProperty("uuid").toString
+ // keep committing until one fails
+ val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
+ val childCarbonTable = childLoadCommand.table
+ val carbonTablePath =
+ new CarbonTablePath(childCarbonTable.getCarbonTableIdentifier,
+ childCarbonTable.getTablePath)
+ // Generate table status file name with UUID, forExample: tablestatus_1
+ val oldTableSchemaPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid)
+ // Generate table status file name without UUID, forExample: tablestatus
+ val newTableSchemaPath = carbonTablePath.getTableStatusFilePath
+ renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.size()) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
+ renamedDataMaps.foreach {
+ loadCommand =>
+ val carbonTable = loadCommand.table
+ val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+ carbonTable.getTablePath)
+ // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
+ val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath + "_backup_" + uuid
+ val tableSchemaPath = carbonTablePath.getTableStatusFilePath
+ markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, loadCommand)
+ renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
}
}
+ // after success/failure of commit delete all tablestatus files with UUID in their names.
+ // if commit failed then remove the segment directory
+ cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
+ operationContext,
+ uuid)
+ if (commitFailed) {
+ sys.error("Failed to update table status for pre-aggregate table")
+ }
+ }
+
+
+ }
+
+ private def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ loadDataCommand: CarbonLoadDataCommand): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(loadDataCommand.table.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ private def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String) = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
}
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ private def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val carbonTablePath = new CarbonTablePath(childTable.getCarbonTableIdentifier,
+ childTable.getTablePath)
+ val metaDataDir = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath)
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
}
/**
@@ -226,6 +397,7 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
val carbonTable = compactionEvent.carbonTable
val compactionType = compactionEvent.carbonMergerMapping.campactionType
+ val carbonLoadModel = compactionEvent.carbonLoadModel
val sparkSession = compactionEvent.sparkSession
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
@@ -236,6 +408,10 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
compactionType.toString,
Some(System.currentTimeMillis()),
"")
+ operationContext.setProperty(
+ dataMapSchema.getRelationIdentifier.getDatabaseName + "_" +
+ dataMapSchema.getRelationIdentifier.getTableName + "_Segment",
+ carbonLoadModel.getSegmentId)
CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext)
.run(sparkSession)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index dac5d5e..1d4ebec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -35,13 +35,16 @@ import org.apache.spark.sql.types.DataType
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -581,32 +584,33 @@ object PreAggregateUtil {
* This method will start load process on the data map
*/
def startDataLoadForDataMap(
- parentCarbonTable: CarbonTable,
+ parentTableIdentifier: TableIdentifier,
segmentToLoad: String,
validateSegments: Boolean,
- sparkSession: SparkSession,
- loadCommand: CarbonLoadDataCommand): Unit = {
+ loadCommand: CarbonLoadDataCommand,
+ isOverwrite: Boolean,
+ sparkSession: SparkSession): Unit = {
CarbonSession.threadSet(
CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- parentCarbonTable.getDatabaseName + "." +
- parentCarbonTable.getTableName,
+ parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+ parentTableIdentifier.table,
segmentToLoad)
CarbonSession.threadSet(
CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
- parentCarbonTable.getDatabaseName + "." +
- parentCarbonTable.getTableName, validateSegments.toString)
+ parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+ parentTableIdentifier.table, validateSegments.toString)
CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
try {
loadCommand.processData(sparkSession)
} finally {
CarbonSession.threadUnset(
CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- parentCarbonTable.getDatabaseName + "." +
- parentCarbonTable.getTableName)
+ parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+ parentTableIdentifier.table)
CarbonSession.threadUnset(
CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
- parentCarbonTable.getDatabaseName + "." +
- parentCarbonTable.getTableName)
+ parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+ parentTableIdentifier.table)
}
}
@@ -885,7 +889,8 @@ object PreAggregateUtil {
dataMapIdentifier: TableIdentifier,
dataFrame: DataFrame,
isOverwrite: Boolean,
- sparkSession: SparkSession): CarbonLoadDataCommand = {
+ sparkSession: SparkSession,
+ timeseriesParentTableName: String = ""): CarbonLoadDataCommand = {
val headers = columns.asScala.filter { column =>
!column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
}.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
@@ -896,7 +901,8 @@ object PreAggregateUtil {
Map("fileheader" -> headers),
isOverwriteTable = isOverwrite,
dataFrame = None,
- internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
+ internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
+ "timeseriesParent" -> timeseriesParentTableName),
logicalPlan = Some(dataFrame.queryExecution.logical))
loadCommand
}
@@ -904,4 +910,5 @@ object PreAggregateUtil {
def getDataFrame(sparkSession: SparkSession, child: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, child)
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 78964e7..190c72c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -147,6 +147,19 @@ public class LoadEvents {
return carbonTable;
}
}
+
+ public static class LoadTablePostStatusUpdateEvent extends Event {
+ private CarbonLoadModel carbonLoadModel;
+
+ public LoadTablePostStatusUpdateEvent(CarbonLoadModel carbonLoadModel) {
+ this.carbonLoadModel = carbonLoadModel;
+ }
+
+ public CarbonLoadModel getCarbonLoadModel() {
+ return carbonLoadModel;
+ }
+ }
+
/**
* Class for handling clean up in case of any failure and abort the operation.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 12fc5c1..3a83427 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -150,6 +150,22 @@ public final class CarbonLoaderUtil {
public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
throws IOException {
+ return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, "");
+ }
+
+ /**
+ * This API will write the load level metadata for the loadmanagement module inorder to
+ * manage the load and query execution management smoothly.
+ *
+ * @param newMetaEntry
+ * @param loadModel
+ * @param uuid
+ * @return boolean which determines whether status update is done or not.
+ * @throws IOException
+ */
+ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
+ CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
+ throws IOException {
boolean status = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -159,7 +175,12 @@ public final class CarbonLoaderUtil {
if (!FileFactory.isFileExist(metadataPath, fileType)) {
FileFactory.mkdirs(metadataPath, fileType);
}
- String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ String tableStatusPath;
+ if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) {
+ tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid);
+ } else {
+ tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ }
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
int retryCount = CarbonLockUtil
@@ -314,7 +335,6 @@ public final class CarbonLoaderUtil {
new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
try {
-
dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
@@ -367,7 +387,7 @@ public final class CarbonLoaderUtil {
public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
- boolean insertOverwrite) throws IOException {
+ boolean insertOverwrite, String uuid) throws IOException {
LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails();
SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS;
if (insertOverwrite) {
@@ -381,18 +401,23 @@ public final class CarbonLoaderUtil {
}
CarbonLoaderUtil
.populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
- boolean entryAdded =
- CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite);
+ boolean entryAdded = CarbonLoaderUtil
+ .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid);
if (!entryAdded) {
throw new IOException("Dataload failed due to failure in table status updation for "
+ model.getTableName());
}
}
+ public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
+ boolean insertOverwrite) throws IOException {
+ readAndUpdateLoadProgressInTableMeta(model, insertOverwrite, "");
+ }
+
/**
* This method will update the load failure entry in the table status file
*/
- public static void updateTableStatusForFailure(CarbonLoadModel model)
+ public static void updateTableStatusForFailure(CarbonLoadModel model, String uuid)
throws IOException {
// in case if failure the load status should be "Marked for delete" so that it will be taken
// care during clean up
@@ -404,14 +429,22 @@ public final class CarbonLoaderUtil {
}
CarbonLoaderUtil
.populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true);
- boolean entryAdded =
- CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false);
+ boolean entryAdded = CarbonLoaderUtil.recordNewLoadMetadata(
+ loadMetaEntry, model, false, false, uuid);
if (!entryAdded) {
throw new IOException(
"Failed to update failure entry in table status for " + model.getTableName());
}
}
+ /**
+ * This method will update the load failure entry in the table status file with empty uuid.
+ */
+ public static void updateTableStatusForFailure(CarbonLoadModel model)
+ throws IOException {
+ updateTableStatusForFailure(model, "");
+ }
+
public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier)
throws IOException {
Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =