You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/03/27 15:11:24 UTC

[carbondata] branch master updated: [CARBONDATA-3333]Fixed No Sort Store Size issue and Compatibility issue after alter added column done in 1.1 and load in 1.5

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a7e3bd  [CARBONDATA-3333]Fixed No Sort Store Size issue and Compatibility issue after alter added column done in 1.1 and load in 1.5
4a7e3bd is described below

commit 4a7e3bdde32d56288878967e29b8761078cb640a
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Tue Mar 26 22:46:01 2019 +0530

    [CARBONDATA-3333]Fixed No Sort Store Size issue and Compatibility issue after alter added column done in 1.1 and load in 1.5
    
    Issue 1: Load is failing in latest version with alter in older version
    This is because in table spec was not created based on sort column order and while writing re-arranging the schema the column page is not handled
    
    Issue 2: After PR#3140 store size got increased
    Store size got increased after pr#3140 and because of this query performance got degraded, in this pr reverted back the changes done in PR#3140
---
 .../carbondata/core/datastore/TableSpec.java       | 58 ++++++++++++++------
 .../CarbonRowDataWriterProcessorStepImpl.java      | 61 ++++++++++++----------
 .../carbondata/processing/store/TablePage.java     | 49 ++++++++++++++---
 3 files changed, 117 insertions(+), 51 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index a26d6ae..002104a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -71,30 +71,52 @@ public class TableSpec {
   }
 
   private void addDimensions(List<CarbonDimension> dimensions) {
-    int dimIndex = 0;
+    List<DimensionSpec> sortDimSpec = new ArrayList<>();
+    List<DimensionSpec> noSortDimSpec = new ArrayList<>();
+    List<DimensionSpec> noSortNoDictDimSpec = new ArrayList<>();
+    List<DimensionSpec> sortNoDictDimSpec = new ArrayList<>();
+    DimensionSpec spec;
+    short actualPosition = 0;
+    // sort step's output is based on sort column order i.e sort columns data will be present
+    // ahead of non sort columns, so table spec also need to add dimension spec in same manner
     for (int i = 0; i < dimensions.size(); i++) {
       CarbonDimension dimension = dimensions.get(i);
       if (dimension.isComplex()) {
-        DimensionSpec spec = new DimensionSpec(ColumnType.COMPLEX, dimension);
-        dimensionSpec[dimIndex++] = spec;
-        noDictionaryDimensionSpec.add(spec);
+        spec = new DimensionSpec(ColumnType.COMPLEX, dimension, actualPosition++);
       } else if (dimension.getDataType() == DataTypes.TIMESTAMP && !dimension
           .isDirectDictionaryEncoding()) {
-        DimensionSpec spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension);
-        dimensionSpec[dimIndex++] = spec;
-        noDictionaryDimensionSpec.add(spec);
+        spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension, actualPosition++);
       } else if (dimension.isDirectDictionaryEncoding()) {
-        DimensionSpec spec = new DimensionSpec(ColumnType.DIRECT_DICTIONARY, dimension);
-        dimensionSpec[dimIndex++] = spec;
+        spec = new DimensionSpec(ColumnType.DIRECT_DICTIONARY, dimension, actualPosition++);
       } else if (dimension.isGlobalDictionaryEncoding()) {
-        DimensionSpec spec = new DimensionSpec(ColumnType.GLOBAL_DICTIONARY, dimension);
-        dimensionSpec[dimIndex++] = spec;
+        spec = new DimensionSpec(ColumnType.GLOBAL_DICTIONARY, dimension, actualPosition++);
       } else {
-        DimensionSpec spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension);
-        dimensionSpec[dimIndex++] = spec;
-        noDictionaryDimensionSpec.add(spec);
+        spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension, actualPosition++);
+      }
+      if (dimension.isSortColumn()) {
+        sortDimSpec.add(spec);
+        if (!dimension.isDirectDictionaryEncoding() && !dimension.isGlobalDictionaryEncoding()
+            || spec.getColumnType() == ColumnType.COMPLEX) {
+          sortNoDictDimSpec.add(spec);
+        }
+      } else {
+        noSortDimSpec.add(spec);
+        if (!dimension.isDirectDictionaryEncoding() && !dimension.isGlobalDictionaryEncoding()
+            || spec.getColumnType() == ColumnType.COMPLEX) {
+          noSortNoDictDimSpec.add(spec);
+        }
       }
     }
+    // combine the result
+    final DimensionSpec[] sortDimensionSpecs =
+        sortDimSpec.toArray(new DimensionSpec[sortDimSpec.size()]);
+    final DimensionSpec[] noSortDimensionSpecs =
+        noSortDimSpec.toArray(new DimensionSpec[noSortDimSpec.size()]);
+    System.arraycopy(sortDimensionSpecs, 0, dimensionSpec, 0, sortDimensionSpecs.length);
+    System.arraycopy(noSortDimensionSpecs, 0, dimensionSpec, sortDimensionSpecs.length,
+        noSortDimensionSpecs.length);
+    noDictionaryDimensionSpec.addAll(sortNoDictDimSpec);
+    noDictionaryDimensionSpec.addAll(noSortNoDictDimSpec);
   }
 
   private void addMeasures(List<CarbonMeasure> measures) {
@@ -255,10 +277,13 @@ public class TableSpec {
     // indicate whether this dimension need to do inverted index
     private boolean doInvertedIndex;
 
-    DimensionSpec(ColumnType columnType, CarbonDimension dimension) {
+    // indicate the actual postion in blocklet
+    private short actualPostion;
+    DimensionSpec(ColumnType columnType, CarbonDimension dimension, short actualPostion) {
       super(dimension.getColName(), dimension.getDataType(), columnType);
       this.inSortColumns = dimension.isSortColumn();
       this.doInvertedIndex = dimension.isUseInvertedIndex();
+      this.actualPostion = actualPostion;
     }
 
     public boolean isInSortColumns() {
@@ -269,6 +294,9 @@ public class TableSpec {
       return doInvertedIndex;
     }
 
+    public short getActualPostion() {
+      return actualPostion;
+    }
     @Override
     public void write(DataOutput out) throws IOException {
       super.write(out);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 6345035..25f7cfb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,7 +18,9 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -81,17 +83,16 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
 
-  private CarbonFactHandler dataHandler;
+  private List<CarbonFactHandler> carbonFactHandlers;
 
   private ExecutorService executorService = null;
 
-  private static final Object lock = new Object();
-
   public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
     this.localDictionaryGeneratorMap =
         CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
+    this.carbonFactHandlers = new CopyOnWriteArrayList<>();
   }
 
   @Override public void initialize() throws IOException {
@@ -128,31 +129,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
           .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
               System.currentTimeMillis());
 
-      //Creating a Instance of CarbonFacthandler that will be passed to all the threads
-      String[] storeLocation = getStoreLocation();
-      DataMapWriterListener listener = getDataMapWriterListener(0);
-      CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-          .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
-      model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
-      dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
-      dataHandler.initialise();
-
       if (iterators.length == 1) {
-        doExecute(iterators[0], 0, dataHandler);
+        doExecute(iterators[0], 0);
       } else {
         executorService = Executors.newFixedThreadPool(iterators.length,
             new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
                 .getCarbonTableIdentifier().getTableName(), true));
         Future[] futures = new Future[iterators.length];
         for (int i = 0; i < iterators.length; i++) {
-          futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i, dataHandler));
+          futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i));
         }
         for (Future future : futures) {
           future.get();
         }
       }
-      finish(dataHandler, 0);
-      dataHandler = null;
     } catch (CarbonDataWriterException e) {
       LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
       throw new CarbonDataLoadingException(
@@ -167,15 +157,31 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     return null;
   }
 
-  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
-      CarbonFactHandler dataHandler) throws IOException {
+  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
+    String[] storeLocation = getStoreLocation();
+    DataMapWriterListener listener = getDataMapWriterListener(0);
+    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
+        configuration, storeLocation, 0, iteratorIndex, listener);
+    model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
+    CarbonFactHandler dataHandler = null;
     boolean rowsNotExist = true;
     while (iterator.hasNext()) {
       if (rowsNotExist) {
         rowsNotExist = false;
+        dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
+        this.carbonFactHandlers.add(dataHandler);
+        dataHandler.initialise();
       }
       processBatch(iterator.next(), dataHandler, iteratorIndex);
     }
+    try {
+      if (!rowsNotExist) {
+        finish(dataHandler, iteratorIndex);
+      }
+    } finally {
+      carbonFactHandlers.remove(dataHandler);
+    }
+
 
   }
 
@@ -300,9 +306,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       while (batch.hasNext()) {
         CarbonRow row = batch.next();
         CarbonRow converted = convertRow(row);
-        synchronized (lock) {
-          dataHandler.addDataToStore(converted);
-        }
+        dataHandler.addDataToStore(converted);
         readCounter[iteratorIndex]++;
       }
       writeCounter[iteratorIndex] += batch.getSize();
@@ -316,18 +320,15 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
     private Iterator<CarbonRowBatch> iterator;
     private int iteratorIndex = 0;
-    private CarbonFactHandler dataHandler = null;
 
-    DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
-        CarbonFactHandler dataHandler) {
+    DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
       this.iterator = iterator;
       this.iteratorIndex = iteratorIndex;
-      this.dataHandler = dataHandler;
     }
 
     @Override public void run() {
       try {
-        doExecute(this.iterator, iteratorIndex, dataHandler);
+        doExecute(this.iterator, iteratorIndex);
       } catch (IOException e) {
         LOGGER.error(e.getMessage(), e);
         throw new RuntimeException(e);
@@ -341,9 +342,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       if (null != executorService) {
         executorService.shutdownNow();
       }
-      if (null != dataHandler) {
-        dataHandler.finish();
-        dataHandler.closeHandler();
+      if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) {
+        for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
+          carbonFactHandler.finish();
+          carbonFactHandler.closeHandler();
+        }
       }
     }
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 7cc8932..5687549 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -393,12 +392,14 @@ public class TablePage {
   private EncodedColumnPage[] encodeAndCompressDimensions()
       throws KeyGenException, IOException, MemoryException {
     List<EncodedColumnPage> encodedDimensions = new ArrayList<>();
-    List<EncodedColumnPage> encodedComplexDimensions = new ArrayList<>();
+    EncodedColumnPage[][] complexColumnPages =
+        new EncodedColumnPage[complexDimensionPages.length][];
     TableSpec tableSpec = model.getTableSpec();
     int dictIndex = 0;
     int noDictIndex = 0;
     int complexDimIndex = 0;
     int numDimensions = tableSpec.getNumDimensions();
+    int totalComplexColumnSize = 0;
     for (int i = 0; i < numDimensions; i++) {
       ColumnPageEncoder columnPageEncoder;
       EncodedColumnPage encodedPage;
@@ -434,17 +435,51 @@ public class TablePage {
           break;
         case COMPLEX:
           EncodedColumnPage[] encodedPages = ColumnPageEncoder.encodeComplexColumn(
-              complexDimensionPages[complexDimIndex++]);
-          encodedComplexDimensions.addAll(Arrays.asList(encodedPages));
+              complexDimensionPages[complexDimIndex]);
+          complexColumnPages[complexDimIndex] = encodedPages;
+          totalComplexColumnSize += encodedPages.length;
+          complexDimIndex++;
           break;
         default:
           throw new IllegalArgumentException("unsupported dimension type:" + spec
               .getColumnType());
       }
     }
-
-    encodedDimensions.addAll(encodedComplexDimensions);
-    return encodedDimensions.toArray(new EncodedColumnPage[encodedDimensions.size()]);
+    // below code is to combine the list based on actual order present in carbon table
+    // in case of older version(eg:1.1) alter add column was supported only with sort columns
+    // and sort step will return the data based on sort column order(sort columns first)
+    // so arranging the column pages based on schema is required otherwise query will
+    // either give wrong result(for string columns) or throw exception in case of non string
+    // column as reading is based on schema order
+    int complexEncodedPageIndex = 0;
+    int normalEncodedPageIndex  = 0;
+    int currentPosition = 0;
+    EncodedColumnPage[] combinedList =
+        new EncodedColumnPage[encodedDimensions.size() + totalComplexColumnSize];
+    for (int i = 0; i < numDimensions; i++) {
+      TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
+      switch (spec.getColumnType()) {
+        case GLOBAL_DICTIONARY:
+        case DIRECT_DICTIONARY:
+        case PLAIN_VALUE:
+          // add the dimension based on actual postion
+          // current position is considered as complex column will have multiple children
+          combinedList[currentPosition + spec.getActualPostion()] =
+              encodedDimensions.get(normalEncodedPageIndex++);
+          break;
+        case COMPLEX:
+          EncodedColumnPage[] complexColumnPage = complexColumnPages[complexEncodedPageIndex++];
+          for (int j = 0; j < complexColumnPage.length; j++) {
+            combinedList[currentPosition + spec.getActualPostion() + j] = complexColumnPage[j];
+          }
+          // as for complex type 1 position is already considered, so subtract -1
+          currentPosition += complexColumnPage.length - 1;
+          break;
+        default:
+          throw new IllegalArgumentException("unsupported dimension type:" + spec.getColumnType());
+      }
+    }
+    return combinedList;
   }
 
   /**