You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:41:34 UTC

[carbondata] 14/41: [CARBONDATA-3307] Fix Performance Issue in No Sort

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 46fc6c550ec34faaeebbba792edc3efd77e1f701
Author: shivamasn <sh...@gmail.com>
AuthorDate: Wed Mar 6 19:03:01 2019 +0530

    [CARBONDATA-3307] Fix Performance Issue in No Sort
    
    When creating the table without sort_columns and loading the data into it, it is generating more carbondata
    files than expected. Now the no. of carbondata files is being generated based on the no. of threads launched.
    Each thread is initialising its own writer and writing data.
    
    Now we pass the same writer instance to all the threads, so all the threads will write the data to same file.
    
    This closes #3140
---
 .../CarbonRowDataWriterProcessorStepImpl.java      | 61 ++++++++++------------
 1 file changed, 29 insertions(+), 32 deletions(-)

diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index f976abe..184248c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,9 +18,7 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -83,16 +81,17 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
 
-  private List<CarbonFactHandler> carbonFactHandlers;
+  private CarbonFactHandler dataHandler;
 
   private ExecutorService executorService = null;
 
+  private static final Object lock = new Object();
+
   public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
     this.localDictionaryGeneratorMap =
         CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
-    this.carbonFactHandlers = new CopyOnWriteArrayList<>();
   }
 
   @Override public void initialize() throws IOException {
@@ -129,20 +128,31 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
           .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
               System.currentTimeMillis());
 
+      //Creating a Instance of CarbonFacthandler that will be passed to all the threads
+      String[] storeLocation = getStoreLocation();
+      DataMapWriterListener listener = getDataMapWriterListener(0);
+      CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+          .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
+      model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
+      dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
+      dataHandler.initialise();
+
       if (iterators.length == 1) {
-        doExecute(iterators[0], 0);
+        doExecute(iterators[0], 0, dataHandler);
       } else {
         executorService = Executors.newFixedThreadPool(iterators.length,
             new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
                 .getCarbonTableIdentifier().getTableName()));
         Future[] futures = new Future[iterators.length];
         for (int i = 0; i < iterators.length; i++) {
-          futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i));
+          futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i, dataHandler));
         }
         for (Future future : futures) {
           future.get();
         }
       }
+      finish(dataHandler, 0);
+      dataHandler = null;
     } catch (CarbonDataWriterException e) {
       LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
       throw new CarbonDataLoadingException(
@@ -157,31 +167,15 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     return null;
   }
 
-  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
-    String[] storeLocation = getStoreLocation();
-    DataMapWriterListener listener = getDataMapWriterListener(0);
-    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
-        configuration, storeLocation, 0, iteratorIndex, listener);
-    model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
-    CarbonFactHandler dataHandler = null;
+  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
+      CarbonFactHandler dataHandler) throws IOException {
     boolean rowsNotExist = true;
     while (iterator.hasNext()) {
       if (rowsNotExist) {
         rowsNotExist = false;
-        dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
-        this.carbonFactHandlers.add(dataHandler);
-        dataHandler.initialise();
       }
       processBatch(iterator.next(), dataHandler, iteratorIndex);
     }
-    try {
-      if (!rowsNotExist) {
-        finish(dataHandler, iteratorIndex);
-      }
-    } finally {
-      carbonFactHandlers.remove(dataHandler);
-    }
-
 
   }
 
@@ -306,7 +300,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       while (batch.hasNext()) {
         CarbonRow row = batch.next();
         CarbonRow converted = convertRow(row);
-        dataHandler.addDataToStore(converted);
+        synchronized (lock) {
+          dataHandler.addDataToStore(converted);
+        }
         readCounter[iteratorIndex]++;
       }
       writeCounter[iteratorIndex] += batch.getSize();
@@ -320,15 +316,18 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
     private Iterator<CarbonRowBatch> iterator;
     private int iteratorIndex = 0;
+    private CarbonFactHandler dataHandler = null;
 
-    DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
+    DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
+        CarbonFactHandler dataHandler) {
       this.iterator = iterator;
       this.iteratorIndex = iteratorIndex;
+      this.dataHandler = dataHandler;
     }
 
     @Override public void run() {
       try {
-        doExecute(this.iterator, iteratorIndex);
+        doExecute(this.iterator, iteratorIndex, dataHandler);
       } catch (IOException e) {
         LOGGER.error(e.getMessage(), e);
         throw new RuntimeException(e);
@@ -342,11 +341,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       if (null != executorService) {
         executorService.shutdownNow();
       }
-      if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) {
-        for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
-          carbonFactHandler.finish();
-          carbonFactHandler.closeHandler();
-        }
+      if (null != dataHandler) {
+        dataHandler.finish();
+        dataHandler.closeHandler();
       }
     }
   }