You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 01:57:00 UTC

carbondata git commit: [CARBONDATA-1732] Add S3 support in FileFactory

Repository: carbondata
Updated Branches:
  refs/heads/master 6551620b2 -> 733bb516d


[CARBONDATA-1732] Add S3 support in FileFactory

This closes #1504


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/733bb516
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/733bb516
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/733bb516

Branch: refs/heads/master
Commit: 733bb516dc3fc4a1e2be02b6574c70aafa7d3b9d
Parents: 6551620
Author: Jacky Li <ja...@qq.com>
Authored: Thu Nov 16 17:27:21 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Fri Nov 17 09:54:56 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 21 +++++++-------
 .../core/datastore/impl/FileFactory.java        | 30 +++++++++++++++++---
 .../apache/carbondata/core/util/CarbonUtil.java | 11 ++++---
 .../core/util/path/HDFSLeaseUtils.java          |  1 +
 .../carbondata/hadoop/util/SchemaReader.java    |  1 +
 .../spark/rdd/CarbonDataRDDFactory.scala        | 20 +++++++++----
 6 files changed, 61 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index aeca19f..0a7dfdd 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -147,19 +147,21 @@ public final class CarbonCommonConstants {
    * Load Folder Name
    */
   public static final String LOAD_FOLDER = "Segment_";
-  /**
-   * HDFSURL_PREFIX
-   */
+
   public static final String HDFSURL_PREFIX = "hdfs://";
-  /**
-   * VIEWFSURL_PREFIX
-   */
+
+  public static final String LOCAL_FILE_PREFIX = "file://";
+
   public static final String VIEWFSURL_PREFIX = "viewfs://";
 
-  /**
-   * ALLUXIO_PREFIX
-   */
   public static final String ALLUXIOURL_PREFIX = "alluxio://";
+
+  public static final String S3_PREFIX = "s3://";
+
+  public static final String S3N_PREFIX = "s3n://";
+
+  public static final String S3A_PREFIX = "s3a://";
+
   /**
    * FS_DEFAULT_FS
    */
@@ -1261,7 +1263,6 @@ public final class CarbonCommonConstants {
 
   public static final String MAJOR = "major";
 
-  public static final String LOCAL_FILE_PREFIX = "file://";
   @CarbonProperty
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index e4e4ae2..57a48ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -73,6 +73,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         return new DFSFileHolderImpl();
       default:
         return new FileHolderImpl();
@@ -80,12 +81,17 @@ public final class FileFactory {
   }
 
   public static FileType getFileType(String path) {
-    if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+    String lowerPath = path.toLowerCase();
+    if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
       return FileType.HDFS;
-    } else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
+    } else if (lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
       return FileType.ALLUXIO;
-    } else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+    } else if (lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
       return FileType.VIEWFS;
+    } else if (lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      return FileType.S3;
     }
     return FileType.LOCAL;
   }
@@ -99,6 +105,7 @@ public final class FileFactory {
       case LOCAL:
         return new LocalCarbonFile(getUpdatedFilePath(path, fileType));
       case HDFS:
+      case S3:
         return new HDFSCarbonFile(path);
       case ALLUXIO:
         return new AlluxioCarbonFile(path);
@@ -134,6 +141,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         if (bufferSize == -1) {
@@ -176,6 +184,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataInputStream stream = fs.open(pt, bufferSize);
@@ -203,6 +212,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         return fs.create(pt, true);
@@ -222,6 +232,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataOutputStream stream = null;
@@ -255,6 +266,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
@@ -280,6 +292,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         if (performFileCheck) {
@@ -314,6 +327,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.exists(path);
@@ -332,6 +346,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.createNewFile(path);
@@ -350,6 +365,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.delete(path, true);
@@ -399,6 +415,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.mkdirs(path);
@@ -428,6 +445,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         return fs.append(pt);
@@ -461,6 +479,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         fs.truncate(pt, newSize);
@@ -493,6 +512,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         if (fs.createNewFile(path)) {
@@ -509,7 +529,7 @@ public final class FileFactory {
   }
 
   public enum FileType {
-    LOCAL, HDFS, ALLUXIO, VIEWFS
+    LOCAL, HDFS, ALLUXIO, VIEWFS, S3
   }
 
   /**
@@ -526,6 +546,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         return filePath;
       case LOCAL:
       default:
@@ -574,6 +595,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case S3:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.getContentSummary(path).getLength();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e709df7..bdd7ba3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -754,10 +754,13 @@ public final class CarbonUtil {
 
   private static boolean checkIfPrefixExists(String path) {
     final String lowerPath = path.toLowerCase();
-    return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || lowerPath
-        .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || lowerPath
-        .startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) || lowerPath
-        .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX);
+    return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX) ||
+        lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
index c72c322..fcd1655 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
@@ -78,6 +78,7 @@ public class HDFSLeaseUtils {
     switch (fileType) {
       case ALLUXIO:
       case HDFS:
+      case S3:
         Path path = FileFactory.getPath(filePath);
         FileSystem fs = FileFactory.getFileSystem(path);
         return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index f1ce324..2e6abad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -42,6 +42,7 @@ public class SchemaReader {
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
     if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
         FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) ||
         FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
       String tableName = identifier.getCarbonTableIdentifier().getTableName();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/733bb516/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7c5599b..1ca7456 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -487,8 +487,17 @@ object CarbonDataRDDFactory {
         carbonTable.getCarbonTableIdentifier,
         carbonLoadModel)
       OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
-      updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
-
+      val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
+      if (!done) {
+        CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+        LOGGER.info("********starting clean up**********")
+        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+        LOGGER.info("********clean up done**********")
+        LOGGER.audit("Data load is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        LOGGER.error("Data load failed due to failure in table status updation.")
+        throw new Exception("Data load failed due to failure in table status updation.")
+      }
       if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
         LOGGER.audit("Data load is partially successful for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -738,7 +747,7 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       loadStatus: SegmentStatus,
       overwriteTable: Boolean
-  ): Unit = {
+  ): Boolean = {
     val metadataDetails = if (status != null && status(0) != null) {
       status(0)._2._1
     } else {
@@ -749,9 +758,9 @@ object CarbonDataRDDFactory {
       loadStatus,
       carbonLoadModel.getFactTimeStamp,
       true)
-    val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
+    val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
       overwriteTable)
-    if (!success) {
+    if (!done) {
       val errorMessage = "Dataload failed due to failure in table status updation."
       LOGGER.audit("Data load is failed for " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -761,6 +770,7 @@ object CarbonDataRDDFactory {
       // TODO : Handle it
       LOGGER.info("********Database updated**********")
     }
+    done
   }