You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 01:57:00 UTC
carbondata git commit: [CARBONDATA-1732] Add S3 support in FileFactory
Repository: carbondata
Updated Branches:
refs/heads/master 6551620b2 -> 733bb516d
[CARBONDATA-1732] Add S3 support in FileFactory
This closes #1504
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/733bb516
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/733bb516
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/733bb516
Branch: refs/heads/master
Commit: 733bb516dc3fc4a1e2be02b6574c70aafa7d3b9d
Parents: 6551620
Author: Jacky Li <ja...@qq.com>
Authored: Thu Nov 16 17:27:21 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Fri Nov 17 09:54:56 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 21 +++++++-------
.../core/datastore/impl/FileFactory.java | 30 +++++++++++++++++---
.../apache/carbondata/core/util/CarbonUtil.java | 11 ++++---
.../core/util/path/HDFSLeaseUtils.java | 1 +
.../carbondata/hadoop/util/SchemaReader.java | 1 +
.../spark/rdd/CarbonDataRDDFactory.scala | 20 +++++++++----
6 files changed, 61 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index aeca19f..0a7dfdd 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -147,19 +147,21 @@ public final class CarbonCommonConstants {
* Load Folder Name
*/
public static final String LOAD_FOLDER = "Segment_";
- /**
- * HDFSURL_PREFIX
- */
+
public static final String HDFSURL_PREFIX = "hdfs://";
- /**
- * VIEWFSURL_PREFIX
- */
+
+ public static final String LOCAL_FILE_PREFIX = "file://";
+
public static final String VIEWFSURL_PREFIX = "viewfs://";
- /**
- * ALLUXIO_PREFIX
- */
public static final String ALLUXIOURL_PREFIX = "alluxio://";
+
+ public static final String S3_PREFIX = "s3://";
+
+ public static final String S3N_PREFIX = "s3n://";
+
+ public static final String S3A_PREFIX = "s3a://";
+
/**
* FS_DEFAULT_FS
*/
@@ -1261,7 +1263,6 @@ public final class CarbonCommonConstants {
public static final String MAJOR = "major";
- public static final String LOCAL_FILE_PREFIX = "file://";
@CarbonProperty
public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index e4e4ae2..57a48ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -73,6 +73,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
return new DFSFileHolderImpl();
default:
return new FileHolderImpl();
@@ -80,12 +81,17 @@ public final class FileFactory {
}
public static FileType getFileType(String path) {
- if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+ String lowerPath = path.toLowerCase();
+ if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
return FileType.HDFS;
- } else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
+ } else if (lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
return FileType.ALLUXIO;
- } else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+ } else if (lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
return FileType.VIEWFS;
+ } else if (lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+ return FileType.S3;
}
return FileType.LOCAL;
}
@@ -99,6 +105,7 @@ public final class FileFactory {
case LOCAL:
return new LocalCarbonFile(getUpdatedFilePath(path, fileType));
case HDFS:
+ case S3:
return new HDFSCarbonFile(path);
case ALLUXIO:
return new AlluxioCarbonFile(path);
@@ -134,6 +141,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
if (bufferSize == -1) {
@@ -176,6 +184,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataInputStream stream = fs.open(pt, bufferSize);
@@ -203,6 +212,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
return fs.create(pt, true);
@@ -222,6 +232,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataOutputStream stream = null;
@@ -255,6 +266,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
@@ -280,6 +292,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
if (performFileCheck) {
@@ -314,6 +327,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.exists(path);
@@ -332,6 +346,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.createNewFile(path);
@@ -350,6 +365,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.delete(path, true);
@@ -399,6 +415,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.mkdirs(path);
@@ -428,6 +445,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
return fs.append(pt);
@@ -461,6 +479,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
fs.truncate(pt, newSize);
@@ -493,6 +512,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
if (fs.createNewFile(path)) {
@@ -509,7 +529,7 @@ public final class FileFactory {
}
public enum FileType {
- LOCAL, HDFS, ALLUXIO, VIEWFS
+ LOCAL, HDFS, ALLUXIO, VIEWFS, S3
}
/**
@@ -526,6 +546,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
return filePath;
case LOCAL:
default:
@@ -574,6 +595,7 @@ public final class FileFactory {
case HDFS:
case ALLUXIO:
case VIEWFS:
+ case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.getContentSummary(path).getLength();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e709df7..bdd7ba3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -754,10 +754,13 @@ public final class CarbonUtil {
private static boolean checkIfPrefixExists(String path) {
final String lowerPath = path.toLowerCase();
- return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || lowerPath
- .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || lowerPath
- .startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) || lowerPath
- .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX);
+ return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX) ||
+ lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
index c72c322..fcd1655 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
@@ -78,6 +78,7 @@ public class HDFSLeaseUtils {
switch (fileType) {
case ALLUXIO:
case HDFS:
+ case S3:
Path path = FileFactory.getPath(filePath);
FileSystem fs = FileFactory.getFileSystem(path);
return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index f1ce324..2e6abad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -42,6 +42,7 @@ public class SchemaReader {
String schemaFilePath = carbonTablePath.getSchemaFilePath();
if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) ||
FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
String tableName = identifier.getCarbonTableIdentifier().getTableName();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7c5599b..1ca7456 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -487,8 +487,17 @@ object CarbonDataRDDFactory {
carbonTable.getCarbonTableIdentifier,
carbonLoadModel)
OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
- updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
-
+ val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
+ if (!done) {
+ CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+ LOGGER.info("********starting clean up**********")
+ CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+ LOGGER.info("********clean up done**********")
+ LOGGER.audit("Data load is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ LOGGER.error("Data load failed due to failure in table status updation.")
+ throw new Exception("Data load failed due to failure in table status updation.")
+ }
if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
LOGGER.audit("Data load is partially successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -738,7 +747,7 @@ object CarbonDataRDDFactory {
carbonLoadModel: CarbonLoadModel,
loadStatus: SegmentStatus,
overwriteTable: Boolean
- ): Unit = {
+ ): Boolean = {
val metadataDetails = if (status != null && status(0) != null) {
status(0)._2._1
} else {
@@ -749,9 +758,9 @@ object CarbonDataRDDFactory {
loadStatus,
carbonLoadModel.getFactTimeStamp,
true)
- val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
+ val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
overwriteTable)
- if (!success) {
+ if (!done) {
val errorMessage = "Dataload failed due to failure in table status updation."
LOGGER.audit("Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -761,6 +770,7 @@ object CarbonDataRDDFactory {
// TODO : Handle it
LOGGER.info("********Database updated**********")
}
+ done
}