You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/17 14:15:13 UTC
carbondata git commit: [CARBONDATA-2465] Improve the carbondata file
reliability in data load when direct hdfs write is enabled
Repository: carbondata
Updated Branches:
refs/heads/master 6297ea0b4 -> 8fe165668
[CARBONDATA-2465] Improve the carbondata file reliability in data load when direct hdfs write is enabled
Problem: At present if we enable direct write on HDFS, data is written with replication of 1 which can cause data loss.
Solution: Write with cluster replication. With this change No need to invoke
CompleteHdfsBackendThread/completeRemainingHdfsReplicas for direct hdfs write case.
This closes #2235
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fe16566
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fe16566
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fe16566
Branch: refs/heads/master
Commit: 8fe165668e2662455991f9de6af817ccc99b81ee
Parents: 6297ea0
Author: KanakaKumar <ka...@huawei.com>
Authored: Thu Apr 26 23:39:29 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu May 17 19:42:59 2018 +0530
----------------------------------------------------------------------
.../apache/carbondata/core/util/CarbonUtil.java | 27 -----------
.../store/writer/AbstractFactDataWriter.java | 47 ++++++--------------
2 files changed, 14 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe16566/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ac0a800..9dc4aa2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2817,33 +2817,6 @@ public final class CarbonUtil {
}
/**
- * This method will complete the remaining hdfs replications
- *
- * @param fileName hdfs file name
- * @param fileType filetype
- * @throws CarbonDataWriterException if error occurs
- */
- public static void completeRemainingHdfsReplicas(String fileName, FileFactory.FileType fileType)
- throws CarbonDataWriterException {
- try {
- long startTime = System.currentTimeMillis();
- short replication = FileFactory.getDefaultReplication(fileName, fileType);
- if (1 == replication) {
- return;
- }
- boolean replicateFlag = FileFactory.setReplication(fileName, fileType, replication);
- if (!replicateFlag) {
- LOGGER.error("Failed to set replication for " + fileName + " with factor " + replication);
- }
- LOGGER.info(
- "Total copy time (ms) to copy file " + fileName + " is " + (System.currentTimeMillis()
- - startTime));
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while completing remaining HDFS backups", e);
- }
- }
-
- /**
* This method will read the local carbon data file and write to carbon data file in HDFS
*
* @param carbonStoreFilePath
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe16566/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 6e557cd..8115f97 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -176,6 +176,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
+
if (enableDirectlyWriteData2Hdfs) {
LOGGER.info("Carbondata will directly write fact data to HDFS.");
} else {
@@ -274,22 +275,13 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
protected void commitCurrentFile(boolean copyInCurrentThread) {
notifyDataMapBlockEnd();
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
- if (enableDirectlyWriteData2Hdfs) {
- if (copyInCurrentThread) {
- CarbonUtil.completeRemainingHdfsReplicas(carbonDataFileHdfsPath,
- FileFactory.FileType.HDFS);
- } else {
- executorServiceSubmitList.add(executorService.submit(
- new CompleteHdfsBackendThread(carbonDataFileHdfsPath, FileFactory.FileType.HDFS)));
- }
- } else {
+ if (!enableDirectlyWriteData2Hdfs) {
if (copyInCurrentThread) {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
- model.getCarbonDataDirectoryPath(),
- fileSizeInBytes);
+ model.getCarbonDataDirectoryPath(), fileSizeInBytes);
} else {
executorServiceSubmitList.add(executorService.submit(
- new CompleteHdfsBackendThread(carbonDataFileTempPath, FileFactory.FileType.LOCAL)));
+ new CompleteHdfsBackendThread(carbonDataFileTempPath)));
}
}
}
@@ -310,10 +302,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
if (enableDirectlyWriteData2Hdfs) {
// the block size will be twice the block_size specified by user to make sure that
// one carbondata file only consists exactly one HDFS block.
- // Here we write the first replication and will complete the remaining later.
- fileOutputStream = FileFactory.getDataOutputStream(carbonDataFileHdfsPath,
- FileFactory.FileType.HDFS, CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2,
- (short) 1);
+ fileOutputStream = FileFactory
+ .getDataOutputStream(carbonDataFileHdfsPath, FileFactory.FileType.HDFS,
+ CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2);
} else {
//each time we initialize writer, we choose a local temp location randomly
String[] tempFileLocations = model.getStoreLocation();
@@ -416,13 +407,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
writer.writeThrift(blockIndex);
}
writer.close();
- if (enableDirectlyWriteData2Hdfs) {
- executorServiceSubmitList.add(executorService.submit(
- new CompleteHdfsBackendThread(indexFileName, FileFactory.FileType.HDFS)));
- } else {
- CarbonUtil.copyCarbonDataFileToCarbonStorePath(indexFileName,
- model.getCarbonDataDirectoryPath(),
- fileSizeInBytes);
+ if (!enableDirectlyWriteData2Hdfs) {
+ CarbonUtil
+ .copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
+ fileSizeInBytes);
}
}
@@ -459,11 +447,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
* carbon store path
*/
private String fileName;
- private FileFactory.FileType fileType;
- private CompleteHdfsBackendThread(String fileName, FileFactory.FileType fileType) {
+ private CompleteHdfsBackendThread(String fileName) {
this.fileName = fileName;
- this.fileType = fileType;
}
/**
@@ -474,13 +460,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
*/
@Override
public Void call() throws Exception {
- if (FileFactory.FileType.HDFS == fileType) {
- CarbonUtil.completeRemainingHdfsReplicas(fileName, fileType);
- } else {
- CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
- model.getCarbonDataDirectoryPath(),
- fileSizeInBytes);
- }
+ CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName, model.getCarbonDataDirectoryPath(),
+ fileSizeInBytes);
return null;
}
}