You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/07 09:55:26 UTC

[23/49] incubator-carbondata git commit: Added class to handle sorting of data for compaction scenario

Added class to handle sorting of data for compaction scenario


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0f0907a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0f0907a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0f0907a2

Branch: refs/heads/12-dev
Commit: 0f0907a26ab51d98e07208426af8af6543aa2cb9
Parents: 58cd933
Author: ravikiran <ra...@gmail.com>
Authored: Wed Mar 15 20:37:26 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 6 16:17:24 2017 +0530

----------------------------------------------------------------------
 .../merger/CompactionResultSortProcessor.java   | 563 +++++++++++++++++++
 1 file changed, 563 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0f0907a2/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java
new file mode 100644
index 0000000..6cfe8b5
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.schema.metadata.SortObserver;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.pentaho.di.core.exception.KettleException;
+
+/**
+ * This class will process the query result and convert the data
+ * into a format compatible for data load
+ */
+public class CompactionResultSortProcessor {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName());
+  /**
+   * carbon load model that contains all the required information for load
+   */
+  private CarbonLoadModel carbonLoadModel;
+  /**
+   * sortDataRows instance for sorting each row read ad writing to sort temp file
+   */
+  private SortDataRows sortDataRows;
+  /**
+   * segment proeprties which contains required information for a segment
+   */
+  private SegmentProperties segmentProperties;
+  /**
+   * segment information of parent table
+   */
+  private SegmentProperties srcSegmentProperties;
+  /**
+   * final merger for merge sort
+   */
+  private SingleThreadFinalSortFilesMerger finalMerger;
+  /**
+   * data handler VO object
+   */
+  private CarbonFactDataHandlerColumnar dataHandler;
+  /**
+   * column cardinality
+   */
+  private int[] columnCardinality;
+  /**
+   * Fact Table To Index Table Column Mapping order
+   */
+  private int[] factToIndexColumnMapping;
+  /**
+   * Fact Table Dict Column to Index Table Dict Column Mapping
+   */
+  private int[] factToIndexDictColumnMapping;
+  /**
+   * boolean mapping for no dictionary columns in schema
+   */
+  private boolean[] noDictionaryColMapping;
+  /**
+   * agg type defined for measures
+   */
+  private char[] aggType;
+  /**
+   * segment id
+   */
+  private String segmentId;
+  /**
+   * index table name
+   */
+  private String indexTableName;
+  /**
+   * temp store location to be sued during data load
+   */
+  private String tempStoreLocation;
+  /**
+   * data base name
+   */
+  private String databaseName;
+  /**
+   * no dictionary column count in schema
+   */
+  private int noDictionaryCount;
+  /**
+   * implicit column count in schema
+   */
+  private int implicitColumnCount;
+  /**
+   * total count of measures in schema
+   */
+  private int measureCount;
+  /**
+   * dimension count excluding complex dimension and no dictionary column count
+   */
+  private int dimensionColumnCount;
+  /**
+   * complex dimension count in schema
+   */
+  private int complexDimensionCount;
+  /**
+   * carbon table
+   */
+  private CarbonTable carbonTable;
+  /**
+   * whether the allocated tasks has any record
+   */
+  private boolean isRecordFound;
+
+  /**
+   * @param carbonLoadModel
+   * @param columnCardinality
+   * @param segmentId
+   * @param indexTableName
+   */
+  public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, int[] columnCardinality,
+      String segmentId, String indexTableName, int[] factToIndexColumnMapping,
+      int[] factToIndexDictColumnMapping) {
+    this.carbonLoadModel = carbonLoadModel;
+    this.columnCardinality = columnCardinality;
+    this.segmentId = segmentId;
+    this.indexTableName = indexTableName;
+    this.databaseName = carbonLoadModel.getDatabaseName();
+    this.factToIndexColumnMapping = factToIndexColumnMapping;
+    this.factToIndexDictColumnMapping = factToIndexDictColumnMapping;
+    initSegmentProperties();
+  }
+
+  /**
+   * This method will iterate over the query result and convert it into a format compatible
+   * for data loading
+   *
+   * @param detailQueryResultIteratorList
+   */
+  public void processQueryResult(List<CarbonIterator<BatchResult>> detailQueryResultIteratorList)
+      throws Exception {
+    try {
+      initTempStoreLocation();
+      initSortDataRows();
+      processResult(detailQueryResultIteratorList);
+      // After delete command, if no records are fetched from one split,
+      // below steps are not required to be initialized.
+      if (isRecordFound) {
+        initializeFinalThreadMergerForMergeSort();
+        initDataHandler();
+        readAndLoadDataFromSortTempFiles();
+      }
+    } finally {
+      // clear temp files and folders created during secondary index creation
+      deleteTempStoreLocation();
+    }
+  }
+
+  /**
+   * This method will clean up the local folders and files created for secondary index creation
+   */
+  private void deleteTempStoreLocation() {
+    if (null != tempStoreLocation) {
+      try {
+        CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) });
+      } catch (IOException | InterruptedException e) {
+        LOGGER.error(
+            "Problem deleting local folders during secondary index creation: " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * This method will iterate over the query result and perform row sorting operation
+   *
+   * @param detailQueryResultIteratorList
+   */
+  private void processResult(List<CarbonIterator<BatchResult>> detailQueryResultIteratorList)
+      throws Exception {
+    for (CarbonIterator<BatchResult> detailQueryIterator : detailQueryResultIteratorList) {
+      while (detailQueryIterator.hasNext()) {
+        BatchResult batchResult = detailQueryIterator.next();
+        while (batchResult.hasNext()) {
+          addRowForSorting(prepareRowObjectForSorting(batchResult.next()));
+          isRecordFound = true;
+        }
+      }
+    }
+    try {
+      sortDataRows.startSorting();
+    } catch (CarbonSortKeyAndGroupByException e) {
+      LOGGER.error(e);
+      throw new Exception("Problem loading data while creating secondary index: " + e.getMessage());
+    }
+  }
+
+  /**
+   * This method will prepare the data from raw object that will take part in sorting
+   *
+   * @param row
+   * @return
+   */
+  private Object[] prepareRowObjectForSorting(Object[] row) {
+    ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
+    // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
+
+    List<CarbonDimension> dimensions = segmentProperties.getDimensions();
+    Object[] preparedRow = new Object[dimensions.size() + measureCount];
+
+    // convert the dictionary from MDKey to surrogate key
+    byte[] dictionaryKey = wrapper.getDictionaryKey();
+    long[] keyArray = srcSegmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey);
+    Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount];
+    // Re-ordering is required as per index table column dictionary order,
+    // as output dictionary Byte Array is as per parent table schema order
+    for (int i = 0; i < keyArray.length; i++) {
+      dictionaryValues[factToIndexDictColumnMapping[i]] = Long.valueOf(keyArray[i]).intValue();
+    }
+
+    int noDictionaryIndex = 0;
+    int dictionaryIndex = 0;
+    int i = 0;
+    // loop excluding last dimension as last one is implicit column.
+    for (; i < dimensions.size() - 1; i++) {
+      CarbonDimension dims = dimensions.get(i);
+      if (dims.hasEncoding(Encoding.DICTIONARY)) {
+        // dictionary
+        preparedRow[i] = dictionaryValues[dictionaryIndex++];
+      } else {
+        // no dictionary dims
+        preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+      }
+    }
+
+    // at last add implicit column position reference(PID)
+
+    preparedRow[i] = wrapper.getImplicitColumnByteArray();
+    return preparedRow;
+  }
+
+  /**
+   * This method will read sort temp files, perform merge sort and add it to store for data loading
+   */
+  private void readAndLoadDataFromSortTempFiles() throws Exception {
+    try {
+      Object[] previousRow = null;
+      finalMerger.startFinalMerge();
+      while (finalMerger.hasNext()) {
+        Object[] rowRead = finalMerger.next();
+        // convert the row from surrogate key to MDKey
+        //        Object[] outputRow = CarbonDataProcessorUtil
+        //            .processNoKettle(rowRead, segmentProperties, aggType, measureCount, noDictionaryCount,
+        //                complexDimensionCount);
+        Object[] outputRow = null;
+        dataHandler.addDataToStore(outputRow);
+      }
+      dataHandler.finish();
+    } catch (CarbonDataWriterException e) {
+      LOGGER.error(e);
+      throw new Exception("Problem loading data during compaction: " + e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new Exception("Problem loading data during compaction: " + e.getMessage());
+    } finally {
+      if (null != dataHandler) {
+        try {
+          dataHandler.closeHandler();
+        } catch (CarbonDataWriterException e) {
+          LOGGER.error(e);
+          throw new Exception("Problem loading data during compaction: " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * This method is used to process the row with out kettle.
+   *
+   * @param row               input row
+   * @param segmentProperties
+   * @param aggType
+   * @param measureCount
+   * @param noDictionaryCount
+   * @param complexDimCount
+   * @return
+   * @throws KettleException
+   */
+  public static Object[] processNoKettle(Object[] row, SegmentProperties segmentProperties,
+      char[] aggType, int measureCount, int noDictionaryCount, int complexDimCount)
+      throws KettleException {
+
+    //    int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
+    //
+    //    int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
+    //
+    //    int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
+
+    int measureIndex = 0;
+
+    int noDimByteArrayIndex = 0;
+
+    int dimsArrayIndex = 0;
+
+    Object[] outputRow;
+    // adding one for the high cardinality dims byte array.
+    if (noDictionaryCount > 0 || complexDimCount > 0) {
+      outputRow = new Object[measureCount + 1 + 1];
+    } else {
+      outputRow = new Object[measureCount + 1];
+    }
+
+    int l = 0;
+    int index = 0;
+    Object[] measures = (Object[]) row[measureIndex];
+    for (int i = 0; i < measureCount; i++) {
+      outputRow[l++] = measures[index++];
+    }
+    outputRow[l] = row[noDimByteArrayIndex];
+
+    int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
+    int[] dimsArray = (int[]) row[dimsArrayIndex];
+    for (int i = 0; i < highCardExcludedRows.length; i++) {
+      highCardExcludedRows[i] = dimsArray[i];
+    }
+    try {
+      outputRow[outputRow.length - 1] =
+          segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows);
+    } catch (KeyGenException e) {
+      throw new KettleException("unable to generate the mdkey", e);
+    }
+    return outputRow;
+  }
+
+  /**
+   * initialise segment properties
+   */
+  private void initSegmentProperties() {
+    CarbonTable carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + indexTableName);
+    List<ColumnSchema> columnSchemaList = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(indexTableName),
+            carbonTable.getMeasureByTableName(indexTableName));
+    segmentProperties = new SegmentProperties(columnSchemaList, columnCardinality);
+    srcSegmentProperties =
+        new SegmentProperties(getParentColumnOrder(columnSchemaList), getParentOrderCardinality());
+  }
+
+  /**
+   * Convert index table column order into parent table column order
+   */
+  private List<ColumnSchema> getParentColumnOrder(List<ColumnSchema> columnSchemaList) {
+    List<ColumnSchema> parentColumnList = new ArrayList<ColumnSchema>(columnSchemaList.size());
+    for (int i = 0; i < columnSchemaList.size(); i++) {
+      // Extra cols are dummy_measure & positionId implicit column
+      if (i >= columnCardinality.length) {
+        parentColumnList.add(columnSchemaList.get(i));
+      } else {
+        parentColumnList.add(columnSchemaList.get(factToIndexColumnMapping[i]));
+      }
+    }
+    return parentColumnList;
+  }
+
+  /**
+   * Convert index table column cardinality order into parent table column order
+   */
+  private int[] getParentOrderCardinality() {
+    int[] parentColumnCardinality = new int[columnCardinality.length];
+    for (int i = 0; i < columnCardinality.length; i++) {
+      parentColumnCardinality[i] = columnCardinality[factToIndexColumnMapping[i]];
+    }
+    return parentColumnCardinality;
+  }
+
+  /**
+   * add row to a temp array which will we written to a sort temp file after sorting
+   *
+   * @param row
+   */
+  private void addRowForSorting(Object[] row) throws Exception {
+    try {
+      // prepare row array using RemoveDictionaryUtil class
+      sortDataRows.addRow(row);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      LOGGER.error(e);
+      throw new Exception(
+          "Row addition for sorting failed while creating secondary index: " + e.getMessage());
+    }
+  }
+
+  /**
+   * create an instance of sort data rows
+   */
+  private void initSortDataRows() throws Exception {
+    CarbonTable indexTable = CarbonMetadata.getInstance().getCarbonTable(
+        carbonLoadModel.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + indexTableName);
+    measureCount = indexTable.getMeasureByTableName(indexTableName).size();
+    implicitColumnCount = indexTable.getImplicitDimensionByTableName(indexTableName).size();
+    SortObserver observer = new SortObserver();
+    List<CarbonDimension> dimensions = indexTable.getDimensionByTableName(indexTableName);
+    noDictionaryColMapping = new boolean[dimensions.size()];
+    int i = 0;
+    for (CarbonDimension dimension : dimensions) {
+      if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
+        i++;
+        continue;
+      }
+      noDictionaryColMapping[i++] = true;
+      noDictionaryCount++;
+    }
+    dimensionColumnCount = dimensions.size();
+    SortParameters parameters = createSortParameters();
+    SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters);
+    this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
+    try {
+      this.sortDataRows.initialize();
+    } catch (CarbonSortKeyAndGroupByException e) {
+      LOGGER.error(e);
+      throw new Exception(
+          "Error initializing sort data rows object while creating secondary index: " + e
+              .getMessage());
+    }
+  }
+
+  /**
+   * This method will create the sort parameters VO object
+   *
+   * @return
+   */
+  private SortParameters createSortParameters() {
+    boolean useKettle = false;
+    SortParameters parameters = SortParameters
+        .createSortParameters(databaseName, indexTableName, dimensionColumnCount,
+            complexDimensionCount, measureCount, noDictionaryCount,
+            carbonLoadModel.getPartitionId(), segmentId, carbonLoadModel.getTaskNo(),
+            noDictionaryColMapping);
+    return parameters;
+  }
+
+  /**
+   * create an instance of finalThread merger which will perform merge sort on all the
+   * sort temp files
+   */
+  private void initializeFinalThreadMergerForMergeSort() {
+    String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+    initAggType();
+    // kettle will not be used
+    boolean useKettle = false;
+    finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation, indexTableName,
+        dimensionColumnCount, complexDimensionCount, measureCount, noDictionaryCount, aggType,
+        noDictionaryColMapping, useKettle);
+  }
+
+  /**
+   * initialise carbon data writer instance
+   */
+  private void initDataHandler() throws Exception {
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel = getCarbonFactDataHandlerModel();
+    carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
+    CarbonDataFileAttributes carbonDataFileAttributes =
+        new CarbonDataFileAttributes(Integer.parseInt(carbonLoadModel.getTaskNo()),
+            carbonLoadModel.getFactTimeStamp());
+    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+    if (segmentProperties.getNumberOfNoDictionaryDimension() > 0
+        || segmentProperties.getComplexDimensions().size() > 0) {
+      carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
+    } else {
+      carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
+    }
+    carbonFactDataHandlerModel.setColCardinality(columnCardinality);
+    carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+    // NO-Kettle.
+    carbonFactDataHandlerModel.setUseKettle(false);
+    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+    try {
+      dataHandler.initialise();
+    } catch (CarbonDataWriterException e) {
+      LOGGER.error(e);
+      throw new Exception(
+          "Problem initialising data handler while creating secondary index: " + e.getMessage());
+    }
+  }
+
+  /**
+   * This method will create a model object for carbon fact data handler
+   *
+   * @return
+   */
+  private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel() {
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel = null;
+    //    CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonLoaderUtil
+    //        .getCarbonFactDataHandlerModel(carbonLoadModel, segmentProperties, databaseName,
+    //            indexTableName, tempStoreLocation, carbonLoadModel.getStorePath());
+    return carbonFactDataHandlerModel;
+  }
+
+  /**
+   * initialise temporary store location
+   */
+  private void initTempStoreLocation() {
+    tempStoreLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(databaseName, indexTableName, carbonLoadModel.getTaskNo(),
+            carbonLoadModel.getPartitionId(), segmentId, false);
+  }
+
+  /**
+   * initialise aggregation type for measures for their storage format
+   */
+  private void initAggType() {
+    aggType = new char[measureCount];
+    Arrays.fill(aggType, 'n');
+    carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + indexTableName);
+    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(indexTableName);
+    for (int i = 0; i < measureCount; i++) {
+      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+    }
+  }
+}