You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/07 09:55:06 UTC

[03/49] incubator-carbondata git commit: Changes done: 1. Clean up of folders created locally during data load and insert into operations. 2. Setting the load status properly for success, partial success and failure cases. 3. Printing load statistics in

Changes done:
1. Clean up of folders created locally during data load and insert into operations.
2. Setting the load status properly for success, partial success and failure cases.
3. Printing load statistics in case of success and partial success.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/487e41dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/487e41dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/487e41dd

Branch: refs/heads/12-dev
Commit: 487e41ddd13d9e19813cc1b9c4eda73376f1c8ba
Parents: ada081d
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Apr 5 19:04:14 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 6 10:33:07 2017 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 30 ++++++++++++++++----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 30 ++++++++++++++++----
 2 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/487e41dd/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index cc16398..95f0b10 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -36,6 +36,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -171,13 +174,30 @@ public final class CarbonLoaderUtil {
       tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
     }
     // form local store location
-    String localStoreLocation = CarbonProperties.getInstance()
+    final String localStoreLocation = CarbonProperties.getInstance()
         .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    // submit local folder clean up in another thread so that main thread execution is not blocked
+    ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
     try {
-      CarbonUtil.deleteFoldersAndFiles(new File(localStoreLocation).getParentFile());
-      LOGGER.info("Deleted the local store location" + localStoreLocation);
-    } catch (IOException | InterruptedException e) {
-      LOGGER.error(e, "Failed to delete local data load folder location");
+      localFolderDeletionService.submit(new Callable<Void>() {
+        @Override public Void call() throws Exception {
+          try {
+            long startTime = System.currentTimeMillis();
+            File file = new File(localStoreLocation);
+            CarbonUtil.deleteFoldersAndFiles(file.getParentFile());
+            LOGGER.info(
+                "Deleted the local store location" + localStoreLocation + " : TIme taken: " + (
+                    System.currentTimeMillis() - startTime));
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error(e, "Failed to delete local data load folder location");
+          }
+          return null;
+        }
+      });
+    } finally {
+      if (null != localFolderDeletionService) {
+        localFolderDeletionService.shutdown();
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/487e41dd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 0690ba1..72ee90f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -223,7 +223,7 @@ class NewCarbonDataLoadRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
 
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
@@ -237,7 +237,6 @@ class NewCarbonDataLoadRDD[K, V](
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         new DataLoadExecutor().execute(model,
           loader.storeLocation,
           recordReaders)
@@ -246,9 +245,20 @@ class NewCarbonDataLoadRDD[K, V](
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
           logInfo("Bad Record Found")
         case e: Exception =>
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           logInfo("DataLoad failure", e)
           LOGGER.error(e)
           throw e
+      } finally {
+        // clean up the folders and files created locally for data load operation
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        // in case of failure the same operation will be re-tried several times.
+        // So print the data load statistics only in case of non failure case
+        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+          .equals(loadMetadataDetails.getLoadStatus)) {
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance
+            .printStatisticsInfo(model.getPartitionId)
+        }
       }
 
       def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
@@ -389,7 +399,7 @@ class NewDataFrameLoaderRDD[K, V](
       try {
 
         loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
@@ -420,18 +430,26 @@ class NewDataFrameLoaderRDD[K, V](
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
-
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
-
       } catch {
         case e: BadRecordFoundException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
           logInfo("Bad Record Found")
         case e: Exception =>
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           logInfo("DataLoad failure", e)
           LOGGER.error(e)
           throw e
+      } finally {
+        // clean up the folders and files created locally for data load operation
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        // in case of failure the same operation will be re-tried several times.
+        // So print the data load statistics only in case of non failure case
+        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+          .equals(loadMetadataDetails.getLoadStatus)) {
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance
+            .printStatisticsInfo(model.getPartitionId)
+        }
       }
       var finished = false