You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/03/12 07:26:57 UTC

[carbondata] branch master updated: [CARBONDATA-3307] Fix Performance Issue in No Sort

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e4793  [CARBONDATA-3307] Fix Performance Issue in No Sort
f5e4793 is described below

commit f5e4793bda2324f8417afc4fc7aaeb09acdea2a0
Author: shivamasn <sh...@gmail.com>
AuthorDate: Wed Mar 6 19:03:01 2019 +0530

    [CARBONDATA-3307] Fix Performance Issue in No Sort
    
    When creating the table without sort_columns and loading the data into it, it is generating more carbondata
    files than expected. Now the no. of carbondata files is being generated based on the no. of threads launched.
    Each thread is initialising its own writer and writing data.
    
    Now we pass the same writer instance to all the threads, so all the threads will write the data to same file.
    
    This closes #3140
---
 .../CarbonRowDataWriterProcessorStepImpl.java      | 61 ++++++++++------------
 1 file changed, 29 insertions(+), 32 deletions(-)

diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index f976abe..184248c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,9 +18,7 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -83,16 +81,17 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
 
-  private List<CarbonFactHandler> carbonFactHandlers;
+  private CarbonFactHandler dataHandler;
 
   private ExecutorService executorService = null;
 
+  private static final Object lock = new Object();
+
   public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
     this.localDictionaryGeneratorMap =
         CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
-    this.carbonFactHandlers = new CopyOnWriteArrayList<>();
   }
 
   @Override public void initialize() throws IOException {
@@ -129,20 +128,31 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
           .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
               System.currentTimeMillis());
 
+      //Creating a Instance of CarbonFacthandler that will be passed to all the threads
+      String[] storeLocation = getStoreLocation();
+      DataMapWriterListener listener = getDataMapWriterListener(0);
+      CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+          .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
+      model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
+      dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
+      dataHandler.initialise();
+
       if (iterators.length == 1) {
-        doExecute(iterators[0], 0);
+        doExecute(iterators[0], 0, dataHandler);
       } else {
         executorService = Executors.newFixedThreadPool(iterators.length,
             new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
                 .getCarbonTableIdentifier().getTableName()));
         Future[] futures = new Future[iterators.length];
         for (int i = 0; i < iterators.length; i++) {
-          futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i));
+          futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i, dataHandler));
         }
         for (Future future : futures) {
           future.get();
         }
       }
+      finish(dataHandler, 0);
+      dataHandler = null;
     } catch (CarbonDataWriterException e) {
       LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
       throw new CarbonDataLoadingException(
@@ -157,31 +167,15 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     return null;
   }
 
-  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
-    String[] storeLocation = getStoreLocation();
-    DataMapWriterListener listener = getDataMapWriterListener(0);
-    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
-        configuration, storeLocation, 0, iteratorIndex, listener);
-    model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
-    CarbonFactHandler dataHandler = null;
+  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
+      CarbonFactHandler dataHandler) throws IOException {
     boolean rowsNotExist = true;
     while (iterator.hasNext()) {
       if (rowsNotExist) {
         rowsNotExist = false;
-        dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
-        this.carbonFactHandlers.add(dataHandler);
-        dataHandler.initialise();
       }
       processBatch(iterator.next(), dataHandler, iteratorIndex);
     }
-    try {
-      if (!rowsNotExist) {
-        finish(dataHandler, iteratorIndex);
-      }
-    } finally {
-      carbonFactHandlers.remove(dataHandler);
-    }
-
 
   }
 
@@ -306,7 +300,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       while (batch.hasNext()) {
         CarbonRow row = batch.next();
         CarbonRow converted = convertRow(row);
-        dataHandler.addDataToStore(converted);
+        synchronized (lock) {
+          dataHandler.addDataToStore(converted);
+        }
         readCounter[iteratorIndex]++;
       }
       writeCounter[iteratorIndex] += batch.getSize();
@@ -320,15 +316,18 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
     private Iterator<CarbonRowBatch> iterator;
     private int iteratorIndex = 0;
+    private CarbonFactHandler dataHandler = null;
 
-    DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
+    DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
+        CarbonFactHandler dataHandler) {
       this.iterator = iterator;
       this.iteratorIndex = iteratorIndex;
+      this.dataHandler = dataHandler;
     }
 
     @Override public void run() {
       try {
-        doExecute(this.iterator, iteratorIndex);
+        doExecute(this.iterator, iteratorIndex, dataHandler);
       } catch (IOException e) {
         LOGGER.error(e.getMessage(), e);
         throw new RuntimeException(e);
@@ -342,11 +341,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       if (null != executorService) {
         executorService.shutdownNow();
       }
-      if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) {
-        for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
-          carbonFactHandler.finish();
-          carbonFactHandler.closeHandler();
-        }
+      if (null != dataHandler) {
+        dataHandler.finish();
+        dataHandler.closeHandler();
       }
     }
   }