You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/03/30 05:12:11 UTC

[03/13] incubator-carbondata git commit: Removed kettle related code and refactored
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/
deleted file mode 100644
index f037cf0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/
+++ /dev/null
@@ -1,2009 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.StandardLogService;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.writer.ByteArrayHolder;
-import org.apache.carbondata.core.writer.HierarchyValueWriterForCSV;
-import org.apache.carbondata.processing.constants.LoggerAction;
-import org.apache.carbondata.processing.dataprocessor.manager.CarbonDataProcessorManager;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.mdkeygen.file.FileData;
-import org.apache.carbondata.processing.mdkeygen.file.FileManager;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
-import org.apache.carbondata.processing.schema.metadata.ColumnSchemaDetails;
-import org.apache.carbondata.processing.schema.metadata.ColumnSchemaDetailsWrapper;
-import org.apache.carbondata.processing.schema.metadata.ColumnsInfo;
-import org.apache.carbondata.processing.schema.metadata.HierarchiesInfo;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
-import static org.apache.carbondata.processing.constants.TableOptionConstant.BAD_RECORDS_ACTION;
-import static org.apache.carbondata.processing.constants.TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE;
-import static org.apache.carbondata.processing.constants.TableOptionConstant.SERIALIZATION_NULL_FORMAT;
-import org.pentaho.di.core.exception.KettleException;
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.core.row.ValueMeta;
-import org.pentaho.di.core.row.ValueMetaInterface;
-import org.pentaho.di.trans.Trans;
-import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.BaseStep;
-import org.pentaho.di.trans.step.StepDataInterface;
-import org.pentaho.di.trans.step.StepMeta;
-import org.pentaho.di.trans.step.StepMetaInterface;
-public class CarbonCSVBasedSeqGenStep extends BaseStep {
-  /**
-   */
-  public static final String BYTE_ENCODING = "ISO-8859-1";
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonCSVBasedSeqGenStep.class.getName());
-  /**
-   */
-  private static final int NUM_CORES_DEFAULT_VAL = 2;
-  /**
-   * drivers
-   */
-  private static final Map<String, String> DRIVERS;
-  static {
-    DRIVERS = new HashMap<String, String>(16);
-    DRIVERS.put("oracle.jdbc.OracleDriver", CarbonCommonConstants.TYPE_ORACLE);
-    DRIVERS.put("com.mysql.jdbc.Driver", CarbonCommonConstants.TYPE_MYSQL);
-    DRIVERS.put("", CarbonCommonConstants.TYPE_MYSQL);
-    DRIVERS.put("", CarbonCommonConstants.TYPE_MSSQL);
-    DRIVERS.put("com.sybase.jdbc3.jdbc.SybDriver", CarbonCommonConstants.TYPE_SYBASE);
-  }
-  /**
-   * ReentrantLock getRowLock
-   */
-  private final Object getRowLock = new Object();
-  /**
-   * ReentrantLock putRowLock
-   */
-  private final Object putRowLock = new Object();
-  /**
-   * CarbonSeqGenData
-   */
-  private CarbonCSVBasedSeqGenData data;
-  /**
-   * CarbonSeqGenStepMeta1
-   */
-  private CarbonCSVBasedSeqGenMeta meta;
-  /**
-   * Map of Connection
-   */
-  private Map<String, Connection> cons = new HashMap<>(16);
-  /**
-   * Csv file path
-   */
-  private String csvFilepath;
-  /**
-   * badRecordsLogger
-   */
-  private BadRecordsLogger badRecordsLogger;
-  /**
-   * Normalized Hier and HierWriter map
-   */
-  private Map<String, HierarchyValueWriterForCSV> nrmlizedHierWriterMap =
-      new HashMap<String, HierarchyValueWriterForCSV>(16);
-  /**
-   * load Folder location
-   */
-  private String loadFolderLoc;
-  /**
-   * File manager
-   */
-  private IFileManagerComposite filemanager;
-  /**
-   * measureCol
-   */
-  private List<String> measureCol;
-  /**
-   * dimPresentCsvOrder - Dim present In CSV order
-   */
-  private boolean[] dimPresentCsvOrder;
-  /**
-   * propMap
-   */
-  private Map<String, int[]> propMap;
-  /**
-   * resultArray
-   */
-  private Future[] resultArray;
-  /**
-   * denormHierarchies
-   */
-  private List<String> denormHierarchies;
-  /**
-   * readCounter
-   */
-  private long readCounter;
-  /**
-   * writeCounter
-   */
-  private long writeCounter;
-  /**
-   * logCounter
-   */
-  private int logCounter;
-  /**
-   * presentColumnMapIndex
-   */
-  private int[] presentColumnMapIndex;
-  /**
-   * measurePresentMapping
-   */
-  private boolean[] measurePresentMapping;
-  /**
-   * measureSurrogateReqMapping
-   */
-  private boolean[] measureSurrogateReqMapping;
-  /**
-   * foreignKeyMappingColumns
-   */
-  private String[] foreignKeyMappingColumns;
-  /**
-   * foreignKeyMappingColumns
-   */
-  private String[][] foreignKeyMappingColumnsForMultiple;
-  /**
-   * Meta column names
-   */
-  private String[] metaColumnNames;
-  /**
-   * duplicateColMapping
-   */
-  private int[][] duplicateColMapping;
-  private ExecutorService exec;
-  /**
-   * threadStatusObserver
-   */
-  private ThreadStatusObserver threadStatusObserver;
-  /**
-   * CarbonCSVBasedDimSurrogateKeyGen
-   */
-  private CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen;
-  private DataType[] msrDataType;
-  /**
-   * wrapper object having the columnSchemaDetails
-   */
-  private ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper;
-  /**
-   * to check whether column is a no dicitonary column or not
-   */
-  private boolean[] isNoDictionaryColumn;
-  /**
-   * to check whether column is a no dicitonary column or not
-   */
-  private boolean[] isStringDataType;
-  /**
-   * to check whether column is a no dicitonary column or not
-   */
-  private String[] dataTypes;
-  /**
-   * to check whether column is complex type column or not
-   */
-  private boolean[] isComplexTypeColumn;
-  /**
-   * to store index of no dictionapry column
-   */
-  private int[] noDictionaryAndComplexIndexMapping;
-  private GenericDataType[] complexTypes;
-  private DirectDictionaryGenerator[] directDictionaryGenerators;
-  /**
-   * dimension column ids
-   */
-  private String[] dimensionColumnIds;
-  private Trans dis;
-  /**
-   * Constructor
-   *
-   * @param s
-   * @param stepDataInterface
-   * @param c
-   * @param t
-   * @param dis
-   */
-  public CarbonCSVBasedSeqGenStep(StepMeta s, StepDataInterface stepDataInterface, int c,
-      TransMeta t, Trans dis) {
-    super(s, stepDataInterface, c, t, dis);
-    csvFilepath = dis.getVariable("csvInputFilePath");
-    this.dis = dis;
-  }
-  /**
-   * processRow
-   */
-  public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
-    try {
-      meta = (CarbonCSVBasedSeqGenMeta) smi;
-      StandardLogService.setThreadName(meta.getPartitionID(), null);
-      data = (CarbonCSVBasedSeqGenData) sdi;
-      Object[] r = getRow();  // get row, blocks when needed!
-      if (first) {
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-                .recordGeneratingDictionaryValuesTime(meta.getPartitionID(),
-                        System.currentTimeMillis());
-        first = false;
-        meta.initialize();
-        final Object dataProcessingLockObject = CarbonDataProcessorManager.getInstance()
-            .getDataProcessingLockObject(meta.getDatabaseName() + '_' + meta.getTableName());
-        synchronized (dataProcessingLockObject) {
-          // observer of writing file in thread
-          this.threadStatusObserver = new ThreadStatusObserver();
-          if (csvFilepath == null) {
-            //                    isDBFactLoad = true;
-            csvFilepath = meta.getTableName();
-          }
-          if (null == measureCol) {
-            measureCol = Arrays.asList(meta.measureColumn);
-          }
-          // Update the Null value comparer and update the String against which we need
-          // to check the values coming from the previous step.
-          logCounter =
-              Integer.parseInt(CarbonCommonConstants.DATA_LOAD_LOG_COUNTER_DEFAULT_COUNTER);
-          if (null != getInputRowMeta()) {
-            meta.updateHierMappings(getInputRowMeta());
-            populateCarbonMeasures(meta.measureColumn);
-            meta.msrMapping = getMeasureOriginalIndexes(meta.measureColumn);
-            meta.memberMapping = getMemberMappingOriginalIndexes();
-            data.setInputSize(getInputRowMeta().size());
-            updatePropMap(meta.actualDimArray);
-            if (meta.isAggregate()) {
-              presentColumnMapIndex = createPresentColumnMapIndexForAggregate();
-            } else {
-              presentColumnMapIndex = createPresentColumnMapIndex();
-            }
-            measurePresentMapping = createMeasureMappigs(measureCol);
-            measureSurrogateReqMapping = createMeasureSurrogateReqMapping();
-            createForeignKeyMappingColumns();
-            metaColumnNames = createColumnArrayFromMeta();
-          }
-          if (!meta.isAggregate()) {
-            updateHierarchyKeyGenerators(data.getKeyGenerators(), meta.hirches, meta.dimLens,
-                meta.dimColNames);
-          }
-          data.setGenerator(
-              KeyGeneratorFactory.getKeyGenerator(getUpdatedLens(meta.dimLens, meta.dimPresent)));
-          if (null != getInputRowMeta()) {
-            data.setOutputRowMeta((RowMetaInterface) getInputRowMeta().clone());
-          }
-          this.dimensionColumnIds = meta.getDimensionColumnIds();
-          ColumnsInfo columnsInfo = new ColumnsInfo();
-          columnsInfo.setDims(meta.dims);
-          columnsInfo.setDimColNames(meta.dimColNames);
-          columnsInfo.setKeyGenerators(data.getKeyGenerators());
-          columnsInfo.setDatabaseName(meta.getDatabaseName());
-          columnsInfo.setTableName(meta.getTableName());
-          columnsInfo.setHierTables(meta.hirches.keySet());
-          columnsInfo.setBatchSize(meta.getBatchSize());
-          columnsInfo.setStoreType(meta.getStoreType());
-          columnsInfo.setAggregateLoad(meta.isAggregate());
-          columnsInfo.setMaxKeys(meta.dimLens);
-          columnsInfo.setPropColumns(meta.getPropertiesColumns());
-          columnsInfo.setPropIndx(meta.getPropertiesIndices());
-          columnsInfo.setTimeOrdinalCols(meta.timeOrdinalCols);
-          columnsInfo.setPropTypes(meta.getPropTypes());
-          columnsInfo.setTimDimIndex(meta.timeDimeIndex);
-          columnsInfo.setDimHierRel(meta.getDimTableArray());
-          columnsInfo.setBaseStoreLocation(getCarbonLocalBaseStoreLocation());
-          columnsInfo.setTableName(meta.getTableName());
-          columnsInfo.setPrimaryKeyMap(meta.getPrimaryKeyMap());
-          columnsInfo.setMeasureColumns(meta.measureColumn);
-          columnsInfo.setComplexTypesMap(meta.getComplexTypes());
-          columnsInfo.setDimensionColumnIds(this.dimensionColumnIds);
-          columnsInfo.setColumnSchemaDetailsWrapper(meta.getColumnSchemaDetailsWrapper());
-          columnsInfo.setColumnProperties(meta.getColumnPropertiesMap());
-          updateBagLogFileName();
-          columnsInfo.setTimeOrdinalIndices(meta.timeOrdinalIndices);
-          surrogateKeyGen = new FileStoreSurrogateKeyGenForCSV(columnsInfo, meta.getPartitionID(),
-              meta.getSegmentId(), meta.getTaskNo());
-          data.setSurrogateKeyGen(surrogateKeyGen);
-          updateStoreLocation();
-          // Check the insert hierarchies required or not based on that
-          // Create the list which will hold the hierarchies required to be created
-          // i.e. denormalized hierarchies.
-          if (null != getInputRowMeta()) {
-            denormHierarchies = getDenormalizedHierarchies();
-          }
-          if (null != getInputRowMeta()) {
-            // We consider that there is no time dimension,in these case
-            // the
-            // timeIndex = -1
-            ValueMetaInterface[] out = null;
-            out = new ValueMetaInterface[meta.normLength + meta.msrMapping.length];
-            int outCounter = 0;
-            for (int i = 0; i < meta.actualDimArray.length; i++) {
-              if (meta.dimPresent[i]) {
-                ValueMetaInterface x =
-                    new ValueMeta(meta.actualDimArray[i], ValueMetaInterface.TYPE_STRING,
-                        ValueMetaInterface.STORAGE_TYPE_BINARY_STRING);
-                x.setStorageMetadata(
-                    (new ValueMeta(meta.actualDimArray[i], ValueMetaInterface.TYPE_STRING,
-                        ValueMetaInterface.STORAGE_TYPE_NORMAL)));
-                x.setStringEncoding(BYTE_ENCODING);
-                x.setStringEncoding(BYTE_ENCODING);
-                x.getStorageMetadata().setStringEncoding(BYTE_ENCODING);
-                out[outCounter] = x;
-                outCounter++;
-              }
-            }
-            for (int j = 0; j < meta.measureColumn.length; j++) {
-              for (int k = 0; k < data.getOutputRowMeta().size(); k++) {
-                if (meta.measureColumn[j]
-                    .equalsIgnoreCase(data.getOutputRowMeta().getValueMeta(k).getName())) {
-                  out[outCounter] =
-                      new ValueMeta(meta.measureColumn[j], ValueMetaInterface.TYPE_NUMBER,
-                          ValueMetaInterface.STORAGE_TYPE_NORMAL);
-                  out[outCounter].setStorageMetadata(
-                      new ValueMeta(meta.measureColumn[j], ValueMetaInterface.TYPE_NUMBER,
-                          ValueMetaInterface.STORAGE_TYPE_NORMAL));
-                  outCounter++;
-                  break;
-                }
-              }
-            }
-            data.getOutputRowMeta().setValueMetaList(Arrays.asList(out));
-          }
-        }
-        columnSchemaDetailsWrapper = meta.getColumnSchemaDetailsWrapper();
-        if (null != getInputRowMeta()) {
-          generateNoDictionaryAndComplexIndexMapping();
-          data.getSurrogateKeyGen()
-              .setDimensionOrdinalToDimensionMapping(populateNameToCarbonDimensionMap());
-        }
-        serializationNullFormat =
-            meta.getTableOptionWrapper().get(SERIALIZATION_NULL_FORMAT.getName());
-        boolean badRecordsLoggerEnable;
-        boolean badRecordsLogRedirect = false;
-        boolean badRecordConvertNullDisable = false;
-        boolean isDataLoadFail = false;
-        badRecordsLoggerEnable = Boolean
-            .parseBoolean(meta.getTableOptionWrapper().get(BAD_RECORDS_LOGGER_ENABLE.getName()));
-        String bad_records_action =
-            meta.getTableOptionWrapper().get(BAD_RECORDS_ACTION.getName());
-        if (null != bad_records_action) {
-          LoggerAction loggerAction = null;
-          try {
-            loggerAction = LoggerAction.valueOf(bad_records_action.toUpperCase());
-          } catch (IllegalArgumentException e) {
-            loggerAction = LoggerAction.FORCE;
-          }
-          switch (loggerAction) {
-            case FORCE:
-              badRecordConvertNullDisable = false;
-              break;
-            case REDIRECT:
-              badRecordsLogRedirect = true;
-              badRecordConvertNullDisable = true;
-              break;
-            case IGNORE:
-              badRecordsLogRedirect = false;
-              badRecordConvertNullDisable = true;
-              break;
-            case FAIL:
-              isDataLoadFail = true;
-              break;
-          }
-        }
-        String key = meta.getDatabaseName() + '/' + meta.getTableName() +
-            '_' + meta.getTableName();
-        badRecordsLogger = new BadRecordsLogger(key, csvFilepath, getBadLogStoreLocation(
-            meta.getDatabaseName() + '/' + meta.getTableName() + "/" + meta.getTaskNo()),
-            badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable,
-            isDataLoadFail);
-        HashMap<String, String> dateformatsHashMap = new HashMap<String, String>();
-        if (meta.dateFormat != null) {
-          String[] dateformats = meta.dateFormat.split(CarbonCommonConstants.COMMA);
-          for (String dateFormat:dateformats) {
-            String[] dateFormatSplits = dateFormat.split(":", 2);
-            dateformatsHashMap.put(dateFormatSplits[0].toLowerCase().trim(),
-                dateFormatSplits[1].trim());
-          }
-        }
-        String[] DimensionColumnIds = meta.getDimensionColumnIds();
-        directDictionaryGenerators =
-            new DirectDictionaryGenerator[DimensionColumnIds.length];
-        for (int i = 0; i < DimensionColumnIds.length; i++) {
-          ColumnSchemaDetails columnSchemaDetails = columnSchemaDetailsWrapper.get(
-              DimensionColumnIds[i]);
-          if (columnSchemaDetails.isDirectDictionary()) {
-            String columnName = columnSchemaDetails.getColumnName();
-            DataType columnType = columnSchemaDetails.getColumnType();
-            if (dateformatsHashMap.containsKey(columnName)) {
-              directDictionaryGenerators[i] =
-                  DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
-                      columnType, dateformatsHashMap.get(columnName));
-            } else {
-              directDictionaryGenerators[i] =
-                  DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(columnType);
-            }
-          }
-        }
-      }
-      // no more input to be expected...
-      if (r == null) {
-        return processWhenRowIsNull();
-      }
-      // proecess the first
-      Object[] out = process(r);
-      readCounter++;
-      if (null != out) {
-        writeCounter++;
-        putRow(data.getOutputRowMeta(), out);
-      }
-      // start multi-thread to process
-      int numberOfNodes;
-      try {
-        numberOfNodes = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-                CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      } catch (NumberFormatException exc) {
-        numberOfNodes = NUM_CORES_DEFAULT_VAL;
-      }
-      startReadingProcess(numberOfNodes);
-      badRecordsLogger.closeStreams();
-      if (!meta.isAggregate()) {
-        closeNormalizedHierFiles();
-      }
-      if (writeCounter == 0) {
-        return processWhenRowIsNull();
-      }
-      CarbonUtil.writeLevelCardinalityFile(loadFolderLoc, meta.getTableName(),
-          getUpdatedCardinality());
-"Record Procerssed For table: " + meta.getTableName());
-      String logMessage =
-          "Summary: Carbon CSV Based Seq Gen Step : " + readCounter + ": Write: " + writeCounter;
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordGeneratingDictionaryValuesTime(
-          meta.getPartitionID(), System.currentTimeMillis());
-      setOutputDone();
-    } catch (RuntimeException ex) {
-      LOGGER.error(ex);
-      throw ex;
-    } catch (Exception ex) {
-      LOGGER.error(ex);
-      throw new RuntimeException(ex);
-    }
-    return false;
-  }
-  private void generateNoDictionaryAndComplexIndexMapping() {
-    isNoDictionaryColumn = new boolean[metaColumnNames.length];
-    isComplexTypeColumn = new boolean[metaColumnNames.length];
-    noDictionaryAndComplexIndexMapping = new int[metaColumnNames.length];
-    isStringDataType = new boolean[metaColumnNames.length];
-    dataTypes = new String[metaColumnNames.length];
-    complexTypes = new GenericDataType[meta.getComplexTypeColumns().length];
-    for (int i = 0; i < meta.noDictionaryCols.length; i++) {
-      for (int j = 0; j < metaColumnNames.length; j++) {
-        if (CarbonCommonConstants.STRING
-            .equalsIgnoreCase(meta.dimColDataTypes.get(metaColumnNames[j]))) {
-          isStringDataType[j] = true;
-        }
-        dataTypes[j] = meta.dimColDataTypes.get(metaColumnNames[j].toLowerCase());
-        if (meta.noDictionaryCols[i].equalsIgnoreCase(
-            meta.getTableName() + CarbonCommonConstants.UNDERSCORE + metaColumnNames[j])) {
-          isNoDictionaryColumn[j] = true;
-          noDictionaryAndComplexIndexMapping[j] = i;
-          break;
-        }
-      }
-    }
-    for (int i = 0; i < meta.getComplexTypeColumns().length; i++) {
-      for (int j = 0; j < metaColumnNames.length; j++) {
-        if (meta.getComplexTypeColumns()[i].equalsIgnoreCase(metaColumnNames[j])) {
-          isComplexTypeColumn[j] = true;
-          complexTypes[i] = meta.complexTypes.get(meta.getComplexTypeColumns()[i]);
-          noDictionaryAndComplexIndexMapping[j] = i + meta.noDictionaryCols.length;
-          break;
-        }
-      }
-    }
-  }
-  private void startReadingProcess(int numberOfNodes) throws KettleException, InterruptedException {
-    startProcess(numberOfNodes);
-  }
-  private boolean processWhenRowIsNull() throws KettleException {
-    // If first request itself is null then It will not enter the first block and
-    // in data surrogatekeygen will not be initialized so it can throw NPE.
-    if (data.getSurrogateKeyGen() == null) {
-      setOutputDone();
-"Record Procerssed For table: " + meta.getTableName());
-      String logMessage =
-          "Summary: Carbon CSV Based Seq Gen Step:  Read: " + readCounter + ": Write: "
-              + writeCounter;
-      return false;
-    }
-    setOutputDone();
-"Record Processed For table: " + meta.getTableName());
-    String logMessage =
-        "Summary: Carbon CSV Based Seq Gen Step:  Read: " + readCounter + ": Write: "
-            + writeCounter;
-    return false;
-  }
-  /**
-   * holds the value to be considered as null while dataload
-   */
-  private String serializationNullFormat;
-  private List<String> getDenormalizedHierarchies() {
-    List<String> hierList = Arrays.asList(meta.hierNames);
-    List<String> denormHiers = new ArrayList<String>(10);
-    for (Iterator<Entry<String, int[]>> iterator = meta.hirches.entrySet().iterator(); iterator
-        .hasNext(); ) {
-      Entry<String, int[]> entry =;
-      String name = entry.getKey();
-      if (hierList.contains(name)) {
-        continue;
-      } else if (entry.getValue().length > 1) {
-        denormHiers.add(name);
-      }
-    }
-    return denormHiers;
-  }
-  private void updatePropMap(String[] actualDimArray) {
-    if (null == propMap) {
-      propMap = new HashMap<String, int[]>(actualDimArray.length);
-    }
-    List<String> currentColNames = new ArrayList<String>(10);
-    for (int i = 0; i < getInputRowMeta().size(); i++) {
-      currentColNames.add(getInputRowMeta().getValueMeta(i).getName());
-    }
-    List<String> currentColName = new ArrayList<String>(actualDimArray.length);
-    for (int i = 0; i < getInputRowMeta().size(); i++) {
-      String columnName = getInputRowMeta().getValueMeta(i).getName();
-      String hier = meta.foreignKeyHierarchyMap.get(columnName);
-      if (null != hier) {
-        if (hier.indexOf(CarbonCommonConstants.COMA_SPC_CHARACTER) > -1) {
-          String[] splittedHiers = hier.split(CarbonCommonConstants.COMA_SPC_CHARACTER);
-          for (String hierName : splittedHiers) {
-            String tableName = meta.getHierDimTableMap().get(hier);
-            String[] cols = meta.hierColumnMap.get(hierName);
-            if (null != cols) {
-              for (String column : cols) {
-                currentColName.add(tableName + '_' + column);
-              }
-            }
-          }
-        } else {
-          String tableName = meta.getHierDimTableMap().get(hier);
-          String[] columns = meta.hierColumnMap.get(hier);
-          if (null != columns) {
-            for (String column : columns) {
-              currentColName.add(tableName + '_' + column);
-            }
-          }
-        }
-      } else
-      // then it can be direct column name if not foreign key.
-      {
-        currentColName.add(meta.getTableName() + '_' + columnName);
-      }
-    }
-    String[] currentColNamesArray = currentColName.toArray(new String[currentColName.size()]);
-    List<HierarchiesInfo> metahierVoList = meta.getMetahierVoList();
-    if (null == metahierVoList) {
-      return;
-    }
-    for (HierarchiesInfo hierInfo : metahierVoList) {
-      Map<String, String[]> columnPropMap = hierInfo.getColumnPropMap();
-      Set<Entry<String, String[]>> entrySet = columnPropMap.entrySet();
-      for (Entry<String, String[]> entry : entrySet) {
-        String[] propColmns = entry.getValue();
-        int[] index = getIndex(currentColNamesArray, propColmns);
-        propMap.put(entry.getKey(), index);
-      }
-    }
-  }
-  private int[] getIndex(String[] currentColNamesArray, String[] propColmns) {
-    int[] resultIndex = new int[propColmns.length];
-    for (int i = 0; i < propColmns.length; i++) {
-      for (int j = 0; j < currentColNamesArray.length; j++) {
-        if (propColmns[i].equalsIgnoreCase(currentColNamesArray[j])) {
-          resultIndex[i] = j;
-          break;
-        }
-      }
-    }
-    return resultIndex;
-  }
-  private void closeNormalizedHierFiles() throws KettleException {
-    if (null == filemanager) {
-      return;
-    }
-    int hierLen = filemanager.size();
-    for (int i = 0; i < hierLen; i++) {
-      FileData hierFileData = (FileData) filemanager.get(i);
-      String hierInProgressFileName = hierFileData.getFileName();
-      HierarchyValueWriterForCSV hierarchyValueWriter =
-          nrmlizedHierWriterMap.get(hierInProgressFileName);
-      if (null == hierarchyValueWriter) {
-        continue;
-      }
-      List<ByteArrayHolder> holders = hierarchyValueWriter.getByteArrayList();
-      Collections.sort(holders);
-      for (ByteArrayHolder holder : holders) {
-        hierarchyValueWriter.writeIntoHierarchyFile(holder.getMdKey(), holder.getPrimaryKey());
-      }
-      // now write the byte array in the file.
-      FileChannel bufferedOutStream = hierarchyValueWriter.getBufferedOutStream();
-      if (null == bufferedOutStream) {
-        continue;
-      }
-      CarbonUtil.closeStreams(bufferedOutStream);
-      hierInProgressFileName = hierFileData.getFileName();
-      int counter = hierarchyValueWriter.getCounter();
-      String storePath = hierFileData.getStorePath();
-      String changedFileName = hierInProgressFileName + (counter - 1);
-      hierInProgressFileName = changedFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-      File currentFile = new File(storePath + File.separator + hierInProgressFileName);
-      File destFile = new File(storePath + File.separator + changedFileName);
-      if (currentFile.exists()) {
-        boolean renameTo = currentFile.renameTo(destFile);
-        if (!renameTo) {
-"Not Able to Rename File : " + currentFile.getName());
-        }
-      }
-    }
-  }
-  /**
-   * Load Store location
-   */
-  private void updateStoreLocation() {
-    loadFolderLoc = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(meta.getDatabaseName(), meta.getTableName(), meta.getTaskNo(),
-            meta.getPartitionID(), meta.getSegmentId() + "", false);
-  }
-  private String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-    return badLogStoreLocation;
-  }
-  private void updateBagLogFileName() {
-    csvFilepath = new File(csvFilepath).getName();
-    if (csvFilepath.indexOf(".") > -1) {
-      csvFilepath = csvFilepath.substring(0, csvFilepath.indexOf("."));
-    }
-    csvFilepath = csvFilepath + '_' + System.currentTimeMillis();
-  }
-  private void startProcess(final int numberOfNodes) throws RuntimeException {
-    exec = Executors.newFixedThreadPool(numberOfNodes);
-    Callable<Void> callable = new Callable<Void>() {
-      @Override public Void call() throws RuntimeException {
-        StandardLogService
-            .setThreadName(StandardLogService.getPartitionID(meta.getTableName()), null);
-        try {
-            doProcess();
-        } catch (Throwable e) {
-          LOGGER.error(e, "Thread is terminated due to error");
-          threadStatusObserver.notifyFailed(e);
-        }
-        return null;
-      }
-    };
-    List<Future<Void>> results = new ArrayList<Future<Void>>(10);
-    for (int i = 0; i < numberOfNodes; i++) {
-      results.add(exec.submit(callable));
-    }
-    this.resultArray = results.toArray(new Future[results.size()]);
-    try {
-      for (int j = 0; j < this.resultArray.length; j++) {
-        this.resultArray[j].get();
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      throw new RuntimeException("Thread InterruptedException", e);
-    } finally {
-      exec.shutdownNow();
-    }
-  }
-  private int[] getUpdatedLens(int[] lens, boolean[] presentDims) {
-    int k = 0;
-    int[] integers = new int[meta.normLength];
-    for (int i = 0; i < lens.length; i++) {
-      if (presentDims[i]) {
-        integers[k] = lens[i];
-        k++;
-      }
-    }
-    return integers;
-  }
-  /**
-   * @return
-   */
-  private int[] getUpdatedCardinality() {
-    int[] maxSurrogateKeyArray = data.getSurrogateKeyGen().max;
-    List<Integer> dimCardWithComplex = new ArrayList<Integer>();
-    for (int i = 0; i < meta.dimColNames.length; i++) {
-      GenericDataType complexDataType =
-          meta.complexTypes.get(meta.dimColNames[i].substring(meta.getTableName().length() + 1));
-      if (complexDataType != null) {
-        complexDataType.fillCardinalityAfterDataLoad(dimCardWithComplex, maxSurrogateKeyArray);
-      } else {
-        dimCardWithComplex.add(maxSurrogateKeyArray[i]);
-      }
-    }
-    int[] complexDimCardinality = new int[dimCardWithComplex.size()];
-    for (int i = 0; i < dimCardWithComplex.size(); i++) {
-      complexDimCardinality[i] = dimCardWithComplex.get(i);
-    }
-    return complexDimCardinality;
-  }
-  private void doProcess() throws RuntimeException {
-    try {
-      for (DirectDictionaryGenerator directDictionaryGenerator: directDictionaryGenerators) {
-        if (directDictionaryGenerator != null) {
-          directDictionaryGenerator.initialize();
-        }
-      }
-      while (true) {
-        Object[] r = null;
-        synchronized (getRowLock) {
-          r = getRow();
-          readCounter++;
-        }
-        // no more input to be expected...
-        if (r == null) {
-          readCounter--;
-          break;
-        }
-        Object[] out = process(r);
-        if (null == out) {
-          continue;
-        }
-        synchronized (putRowLock) {
-          putRow(data.getOutputRowMeta(), out);
-          processRecord();
-          writeCounter++;
-        }
-      }
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-  private void processRecord() {
-    if (readCounter % logCounter == 0) {
-"Record Procerssed For table: " + meta.getTableName());
-      String logMessage = "Carbon Csv Based Seq Gen Step: Record Read : " + readCounter;
-    }
-  }
-  private String getCarbonLocalBaseStoreLocation() {
-    String tempLocationKey =
-        meta.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + meta.getTableName()
-            + CarbonCommonConstants.UNDERSCORE + meta.getTaskNo();
-    String strLoc = CarbonProperties.getInstance()
-        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-    File f = new File(strLoc);
-    String absoluteStorePath = f.getAbsolutePath();
-    return absoluteStorePath;
-  }
-  private Object[] process(Object[] r) throws RuntimeException {
-    try {
-      Object[] out = populateOutputRow(r);
-      if (out != null) {
-        for (int i = 0; i < meta.normLength - meta.complexTypes.size(); i++) {
-          if (null == NonDictionaryUtil.getDimension(i, out)) {
-            NonDictionaryUtil.setDimension(i, 1, out);
-          }
-        }
-      }
-      return out;
-    } catch (KettleException e) {
-      throw new RuntimeException(e);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-  private Object[] populateOutputRow(Object[] r) throws KettleException {
-    // Copy the dimension String values to output
-    int[] memberMapping = meta.memberMapping;
-    int inputColumnsSize = metaColumnNames.length;
-    boolean isGenerated = false;
-    int generatedSurrogate = -1;
-    //If CSV Exported from DB and we enter one row down then that row become empty.
-    // In that case it will have first value empty and other values will be null
-    // So If records is coming like this then we need to write this records as a bad Record.
-    if (null == r[0] && badRecordsLogger.isBadRecordConvertNullDisable()) {
-      badRecordsLogger
-          .addBadRecordsToBuilder(r, "Column Names are coming NULL");
-      return null;
-    }
-    Map<String, Dictionary> dictionaryCaches = surrogateKeyGen.getDictionaryCaches();
-    Object[] out =
-        new Object[meta.normLength + meta.msrs.length - meta.complexTypes.size()];
-    int dimLen = meta.dims.length;
-    Object[] newArray = new Object[CarbonCommonConstants.ARRAYSIZE];
-    ByteBuffer[] byteBufferArr = null;
-    if (null != meta.noDictionaryCols) {
-      byteBufferArr = new ByteBuffer[meta.noDictionaryCols.length + meta.complexTypes.size()];
-    }
-    int i = 0;
-    int index = 0;
-    int l = 0;
-    int msrCount = 0;
-    boolean isNull = false;
-    for (int j = 0; j < inputColumnsSize; j++) {
-      String columnName = metaColumnNames[j];
-      String foreignKeyColumnName = foreignKeyMappingColumns[j];
-      // check if it is ignore dictionary dimension or not . if yes directly write byte buffer
-      String tuple = null == r[j] ?
-          CarbonCommonConstants.MEMBER_DEFAULT_VAL :
-          (String) r[j];
-      // check whether the column value is the value to be  serialized as null.
-      boolean isSerialized = false;
-      if (tuple.equalsIgnoreCase(serializationNullFormat)) {
-        tuple = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-        isSerialized = true;
-      }
-      if (isNoDictionaryColumn[j]) {
-        String dimensionValue =
-            processnoDictionaryDim(noDictionaryAndComplexIndexMapping[j], tuple, dataTypes[j],
-                isStringDataType[j], byteBufferArr);
-        if (!isSerialized && !isStringDataType[j] && CarbonCommonConstants.MEMBER_DEFAULT_VAL
-            .equals(dimensionValue)) {
-          failDataLoad(r, index, columnName, msrDataType[meta.msrMapping[msrCount]].name());
-          addEntryToBadRecords(r, j, columnName, dataTypes[j]);
-          if (badRecordsLogger.isBadRecordConvertNullDisable()) {
-            return null;
-          }
-        }
-        continue;
-      }
-      // There is a possibility that measure can be referred as dimensions also
-      // so in that case we need to just copy the value into the measure column index.
-      //if it enters here means 3 possibility
-      //1) this is not foreign key it can be direct columns
-      //2) This column present in the csv file but in the schema it is not present.
-      //3) This column can be measure column
-      if (measurePresentMapping[j]) {
-        String msr = tuple == null ? null : tuple.toString();
-        isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(msr);
-        if (measureSurrogateReqMapping[j] && !isNull) {
-          Integer surrogate = 0;
-          if (null == foreignKeyColumnName) {
-            // If foreignKeyColumnName is null till here that means this
-            // measure column is of type count and data type may be string
-            // so we have to create the surrogate key for the values.
-            surrogate = createSurrogateForMeasure(msr, columnName);
-            if (presentColumnMapIndex[j] > -1) {
-              isGenerated = true;
-              generatedSurrogate = surrogate;
-            }
-          } else {
-            surrogate = surrogateKeyGen.generateSurrogateKeys(msr, foreignKeyColumnName);
-          }
-          out[memberMapping[dimLen + index]] = surrogate.doubleValue();
-        } else if (!isSerialized &&  (isNull || msr == null
-            || msr.length() == 0)) {
-          failDataLoad(r, index, columnName,
-              msrDataType[meta.msrMapping[msrCount]].name());
-          addEntryToBadRecords(r, j, columnName,
-              msrDataType[meta.msrMapping[msrCount]].name());
-          if (badRecordsLogger.isBadRecordConvertNullDisable()) {
-            return null;
-          }
-        } else {
-          try {
-            if (!isNull && null != msr && msr.length() > 0) {
-              Object measureValueBasedOnDataType = DataTypeUtil
-                  .getMeasureValueBasedOnDataType(msr, msrDataType[meta.msrMapping[msrCount]],
-                      meta.carbonMeasures[meta.msrMapping[msrCount]]);
-              if (null == measureValueBasedOnDataType) {
-                addEntryToBadRecords(r, j, columnName,
-                    msrDataType[meta.msrMapping[msrCount]].name());
-                if (badRecordsLogger.isBadRecordConvertNullDisable()) {
-                  return null;
-                }
-                LOGGER.warn("Cannot convert : " + msr
-                    + " to Numeric type value. Value considered as null.");
-              }
-              out[memberMapping[dimLen + index] - meta.complexTypes.size()] =
-                  measureValueBasedOnDataType;
-            }
-          } catch (NumberFormatException e) {
-            failDataLoad(r, index, columnName,
-                msrDataType[meta.msrMapping[msrCount]].name());
-            addEntryToBadRecords(r, j, columnName, msrDataType[meta.msrMapping[msrCount]].name());
-            if (badRecordsLogger.isBadRecordConvertNullDisable()) {
-              return null;
-            }
-            LOGGER.warn(
-                "Cannot convert : " + msr + " to Numeric type value. Value considered as null.");
-            out[memberMapping[dimLen + index] - meta.complexTypes.size()] = null;
-          }
-        }
-        index++;
-        msrCount++;
-        if (presentColumnMapIndex[j] < 0 && null == foreignKeyColumnName) {
-          continue;
-        }
-      }
-      boolean isPresentInSchema = false;
-      if (null == foreignKeyColumnName) {
-        //if it enters here means 3 possibility
-        //1) this is not foreign key it can be direct columns
-        //2) This column present in the csv file but in the schema it is not present.
-        //3) This column can be measure column
-        int m = presentColumnMapIndex[j];
-        if (m >= 0) {
-          isPresentInSchema = true;
-        }
-        if (isPresentInSchema) {
-          foreignKeyColumnName = meta.dimColNames[m];
-        } else {
-          continue;
-        }
-      }
-      //If it refers to multiple hierarchy by same foreign key
-      if (foreignKeyMappingColumnsForMultiple[j] != null) {
-        String[] splittedHiers = foreignKeyMappingColumnsForMultiple[j];
-        for (String hierForignKey : splittedHiers) {
-          Dictionary dicCache = dictionaryCaches.get(hierForignKey);
-          String actualHierName = null;
-          if (!isPresentInSchema) {
-            actualHierName = meta.hierNames[l++];
-          }
-          Map<Integer, int[]> cache = surrogateKeyGen.getHierCache().get(actualHierName);
-          int[] surrogateKeyForHierarchy = null;
-          if (null != cache) {
-            Integer keyFromCsv = dicCache.getSurrogateKey(tuple);
-            if (null != keyFromCsv) {
-              surrogateKeyForHierarchy = cache.get(keyFromCsv);
-            } else {
-              addMemberNotExistEntry(r, j, columnName);
-              return null;
-            }
-            // If cardinality exceeded for some levels then
-            // for that hierarchy will not be their
-            // so while joining with fact table if we are
-            // getting this scenerio we will log it
-            // in bad records
-            if (null == surrogateKeyForHierarchy) {
-              addEntryToBadRecords(r, j, columnName);
-              return null;
-            }
-          } else {
-            surrogateKeyForHierarchy = new int[1];
-            surrogateKeyForHierarchy[0] =
-                surrogateKeyGen.generateSurrogateKeys(tuple, foreignKeyColumnName);
-          }
-          for (int k = 0; k < surrogateKeyForHierarchy.length; k++) {
-            if (dimPresentCsvOrder[i]) {
-              out[memberMapping[i]] = surrogateKeyForHierarchy[k];
-            }
-            i++;
-          }
-        }
-      } else if (isComplexTypeColumn[j]) {
-        //If it refers to single hierarchy
-        try {
-          GenericDataType complexType =
-              complexTypes[noDictionaryAndComplexIndexMapping[j] - meta.noDictionaryCols.length];
-          ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
-          DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
-          complexType.parseStringAndWriteByteArray(meta.getTableName(), tuple,
-              new String[] { meta.getComplexDelimiterLevel1(), meta.getComplexDelimiterLevel2() },
-              0, dataOutputStream, surrogateKeyGen);
-          byteBufferArr[noDictionaryAndComplexIndexMapping[j]] =
-              ByteBuffer.wrap(byteArray.toByteArray());
-          if (null != byteArray) {
-            byteArray.close();
-          }
-        } catch (IOException e1) {
-          throw new KettleException(
-              "Parsing complex string and generating surrogates/ByteArray failed. ", e1);
-        }
-        i++;
-      } else {
-        Dictionary dicCache = dictionaryCaches.get(foreignKeyColumnName);
-        String actualHierName = null;
-        if (!isPresentInSchema) {
-          actualHierName = meta.hierNames[l++];
-        }
-        Map<Integer, int[]> cache = surrogateKeyGen.getHierCache().get(actualHierName);
-        int[] surrogateKeyForHrrchy = null;
-        if (null != cache) {
-          Integer keyFromCsv = dicCache.getSurrogateKey(tuple);
-          if (null != keyFromCsv) {
-            surrogateKeyForHrrchy = cache.get(keyFromCsv);
-          } else {
-            addMemberNotExistEntry(r, j, columnName);
-            return null;
-          }
-          // If cardinality exceeded for some levels then for that hierarchy will not be their
-          // so while joining with fact table if we are getting this scenerio we will log it
-          // in bad records
-          if (null == surrogateKeyForHrrchy) {
-            addEntryToBadRecords(r, j, columnName);
-            return null;
-          }
-        } else {
-          int[] propIndex = propMap.get(foreignKeyColumnName);
-          Object[] properties;
-          if (null == propIndex) {
-            properties = new Object[0];
-          } else {
-            properties = new Object[propIndex.length];
-            for (int ind = 0; ind < propIndex.length; ind++) {
-              Object objectValue = r[propIndex[ind]];
-              properties[ind] = null == objectValue ?
-                  CarbonCommonConstants.MEMBER_DEFAULT_VAL : (String)objectValue;
-            }
-          }
-          surrogateKeyForHrrchy = new int[1];
-          if (isGenerated && !isNull) {
-            surrogateKeyForHrrchy[0] = generatedSurrogate;
-            isGenerated = false;
-            generatedSurrogate = -1;
-          } else {
-            int m = j;
-            if (isPresentInSchema) {
-              m = presentColumnMapIndex[j];
-            }
-            ColumnSchemaDetails details = columnSchemaDetailsWrapper.get(dimensionColumnIds[m]);
-            if (details.isDirectDictionary()) {
-              surrogateKeyForHrrchy[0] =
-                  directDictionaryGenerators[m].generateDirectSurrogateKey(tuple);
-              if (!isSerialized && surrogateKeyForHrrchy[0] == 1) {
-                failDataLoad(r, index, columnName, details.getColumnType().getName());
-                addEntryToBadRecords(r, j, columnName, details.getColumnType().name());
-                if (badRecordsLogger.isBadRecordConvertNullDisable()) {
-                  return null;
-                }
-              }
-              surrogateKeyGen.max[m] = Integer.MAX_VALUE;
-            } else {
-              String parsedValue = DataTypeUtil.parseValue(tuple, data.getSurrogateKeyGen()
-                  .getDimensionOrdinalToDimensionMapping()[memberMapping[i]]);
-              if (null == parsedValue) {
-                surrogateKeyForHrrchy[0] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
-              } else {
-                surrogateKeyForHrrchy[0] =
-                    surrogateKeyGen.generateSurrogateKeys(parsedValue, foreignKeyColumnName);
-              }
-            }
-          }
-          if (surrogateKeyForHrrchy[0] == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
-            if (!isSerialized) {
-              int m = j;
-              if (isPresentInSchema) {
-                m = presentColumnMapIndex[j];
-              }
-              ColumnSchemaDetails details = columnSchemaDetailsWrapper.get(dimensionColumnIds[m]);
-              failDataLoad(r, index, columnName, details.getColumnType().getName());
-              addEntryToBadRecords(r, j, columnName);
-              if (badRecordsLogger.isBadRecordConvertNullDisable()) {
-                return null;
-              }
-            }
-            surrogateKeyForHrrchy[0] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
-          }
-        }
-        for (int k = 0; k < surrogateKeyForHrrchy.length; k++) {
-          if (dimPresentCsvOrder[i]) {
-            if (duplicateColMapping[j] != null) {
-              for (int m = 0; m < duplicateColMapping[j].length; m++) {
-                out[duplicateColMapping[j][m]] = Integer.valueOf(surrogateKeyForHrrchy[k]);
-              }
-            } else {
-              out[memberMapping[i]] = Integer.valueOf(surrogateKeyForHrrchy[k]);
-            }
-          }
-          i++;
-        }
-      }
-    }
-    insertHierIfRequired(out);
-    NonDictionaryUtil
-        .prepareOut(newArray, byteBufferArr, out, dimLen - meta.complexTypes.size());
-    return newArray;
-  }
-  private void failDataLoad(Object[] row, int index, String columnName, String dataType)
-      throws KettleException {
-    if (badRecordsLogger.isDataLoadFail()) {
-      String errorMessage = getBadRecordEntry(row, index, columnName, dataType);
-      dis.setVariable(CarbonCommonConstants.BAD_RECORD_KEY, errorMessage);
-      LOGGER.error("Data load failed due to bad record. " + errorMessage);
-      throw new KettleException("Data load failed due to bad record");
-    }
-  }
-  private void addEntryToBadRecords(Object[] r, int j, String columnName, String dataType) {
-    dataType = DataTypeUtil.getColumnDataTypeDisplayName(dataType);
-    badRecordsLogger.addBadRecordsToBuilder(r,
-        "The value " + " \"" + r[j] + "\"" + " with column name " + columnName
-            + " and column data type " + dataType + " is not a valid " + dataType + " type.");
-  }
-  private String getBadRecordEntry(Object[] r, int j, String columnName, String dataType) {
-    dataType = DataTypeUtil.getColumnDataTypeDisplayName(dataType);
-    String badRecord = "The value " + " \"" + r[j] + "\"" + " with column name " + columnName
-        + " and column data type " + dataType + " is not a valid Record";
-    return badRecord;
-  }
-  private void addEntryToBadRecords(Object[] r, int j, String columnName) {
-    badRecordsLogger.addBadRecordsToBuilder(r,
-        "Surrogate key for value " + " \"" + r[j] + "\"" + " with column name " + columnName
-            + " not found in dictionary cache");
-  }
-  private void addMemberNotExistEntry(Object[] r, int j, String columnName) {
-    badRecordsLogger.addBadRecordsToBuilder(r,
-        "For Coulmn " + columnName + " \"" + r[j] + "\""
-            + " member not exist in the dimension table ");
-  }
-  private void insertHierIfRequired(Object[] out) throws KettleException {
-    if (denormHierarchies.size() > 0) {
-      insertHierarichies(out);
-    }
-  }
-  private int[] createPresentColumnMapIndex() {
-    int[] presentColumnMapIndex = new int[getInputRowMeta().size()];
-    duplicateColMapping = new int[getInputRowMeta().size()][];
-    Arrays.fill(presentColumnMapIndex, -1);
-    for (int j = 0; j < getInputRowMeta().size(); j++) {
-      String columnName = getInputRowMeta().getValueMeta(j).getName();
-      int m = 0;
-      String foreignKey = meta.foreignKeyHierarchyMap.get(columnName);
-      if (foreignKey == null) {
-        List<Integer> repeats = new ArrayList<Integer>(10);
-        for (String col : meta.dimColNames) {
-          if (col.equalsIgnoreCase(meta.getTableName() + '_' + columnName)) {
-            presentColumnMapIndex[j] = m;
-            repeats.add(m);
-          }
-          m++;
-        }
-        if (repeats.size() > 1) {
-          int[] dims = new int[repeats.size()];
-          for (int i = 0; i < dims.length; i++) {
-            dims[i] = repeats.get(i);
-          }
-          duplicateColMapping[j] = dims;
-        }
-      } else {
-        for (String col : meta.actualDimArray) {
-          if (col.equalsIgnoreCase(columnName)) {
-            presentColumnMapIndex[j] = m;
-            break;
-          }
-          m++;
-        }
-      }
-    }
-    return presentColumnMapIndex;
-  }
-  private int[] createPresentColumnMapIndexForAggregate() {
-    int[] presentColumnMapIndex = new int[getInputRowMeta().size()];
-    duplicateColMapping = new int[getInputRowMeta().size()][];
-    Arrays.fill(presentColumnMapIndex, -1);
-    for (int j = 0; j < getInputRowMeta().size(); j++) {
-      String columnName = getInputRowMeta().getValueMeta(j).getName();
-      int m = 0;
-      String foreignKey = meta.foreignKeyHierarchyMap.get(columnName);
-      if (foreignKey == null) {
-        for (String col : meta.actualDimArray) {
-          if (col.equalsIgnoreCase(columnName)) {
-            presentColumnMapIndex[j] = m;
-            break;
-          }
-          m++;
-        }
-      }
-    }
-    return presentColumnMapIndex;
-  }
-  private String[] createColumnArrayFromMeta() {
-    String[] metaColumnNames = new String[getInputRowMeta().size()];
-    for (int j = 0; j < getInputRowMeta().size(); j++) {
-      metaColumnNames[j] = getInputRowMeta().getValueMeta(j).getName();
-    }
-    return metaColumnNames;
-  }
-  private boolean[] createMeasureMappigs(List<String> measureCol) {
-    int size = getInputRowMeta().size();
-    boolean[] measurePresentMapping = new boolean[size];
-    for (int j = 0; j < size; j++) {
-      String columnName = getInputRowMeta().getValueMeta(j).getName();
-      for (String measure : measureCol) {
-        if (measure.equalsIgnoreCase(columnName)) {
-          measurePresentMapping[j] = true;
-          break;
-        }
-      }
-    }
-    return measurePresentMapping;
-  }
-  private boolean[] createMeasureSurrogateReqMapping() {
-    int size = getInputRowMeta().size();
-    boolean[] measureSuurogateReqMapping = new boolean[size];
-    for (int j = 0; j < size; j++) {
-      String columnName = getInputRowMeta().getValueMeta(j).getName();
-      Boolean isPresent = meta.getMeasureSurrogateRequired().get(columnName);
-      if (null != isPresent && isPresent) {
-        measureSuurogateReqMapping[j] = true;
-      }
-    }
-    return measureSuurogateReqMapping;
-  }
-  private void createForeignKeyMappingColumns() {
-    int size = getInputRowMeta().size();
-    foreignKeyMappingColumns = new String[size];
-    foreignKeyMappingColumnsForMultiple = new String[size][];
-    for (int j = 0; j < size; j++) {
-      String columnName = getInputRowMeta().getValueMeta(j).getName();
-      String foreignKeyColumnName = meta.foreignKeyPrimaryKeyMap.get(columnName);
-      if (foreignKeyColumnName != null) {
-        if (foreignKeyColumnName.indexOf(CarbonCommonConstants.COMA_SPC_CHARACTER) > -1) {
-          String[] splittedHiers =
-              foreignKeyColumnName.split(CarbonCommonConstants.COMA_SPC_CHARACTER);
-          foreignKeyMappingColumnsForMultiple[j] = splittedHiers;
-          foreignKeyMappingColumns[j] = foreignKeyColumnName;
-        } else {
-          foreignKeyMappingColumns[j] = foreignKeyColumnName;
-        }
-      }
-    }
-  }
-  private int createSurrogateForMeasure(String member, String columnName)
-      throws KettleException {
-    String colName = meta.getTableName() + '_' + columnName;
-    return data.getSurrogateKeyGen().getSurrogateForMeasure(member, colName);
-  }
-  private void insertHierarichies(Object[] rowWithKeys) throws KettleException {
-    try {
-      for (String hierName : denormHierarchies) {
-        String storeLocation = "";
-        String hierInprogName = hierName + CarbonCommonConstants.HIERARCHY_FILE_EXTENSION;
-        HierarchyValueWriterForCSV hierWriter = nrmlizedHierWriterMap.get(hierInprogName);
-        storeLocation = loadFolderLoc;
-        if (null == filemanager) {
-          filemanager = new FileManager();
-          filemanager.setName(storeLocation);
-        }
-        if (null == hierWriter) {
-          FileData fileData = new FileData(hierInprogName, storeLocation);
-          hierWriter = new HierarchyValueWriterForCSV(hierInprogName, storeLocation);
-          filemanager.add(fileData);
-          nrmlizedHierWriterMap.put(hierInprogName, hierWriter);
-        }
-        int[] levelsIndxs = meta.hirches.get(hierName);
-        int[] levelSKeys = new int[levelsIndxs.length];
-        if (meta.complexTypes.get(meta.hierColumnMap.get(hierName)[0]) == null) {
-          for (int i = 0; i < levelSKeys.length; i++) {
-            levelSKeys[i] = (Integer) rowWithKeys[levelsIndxs[i]];
-          }
-          if (levelSKeys.length > 1) {
-            data.getSurrogateKeyGen().checkNormalizedHierExists(levelSKeys, hierName, hierWriter);
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new KettleException(e.getMessage(), e);
-    }
-  }
-  private boolean isMeasureColumn(String msName, boolean compareWithTable) {
-    String msrNameTemp;
-    for (String msrName : meta.measureColumn) {
-      msrNameTemp = msrName;
-      if (compareWithTable) {
-        msrNameTemp = meta.getTableName() + '_' + msrNameTemp;
-      }
-      if (msrNameTemp.equalsIgnoreCase(msName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  private int[] getMeasureOriginalIndexes(String[] originalMsrCols) {
-    List<String> currMsrCol = new ArrayList<String>(10);
-    for (int i = 0; i < getInputRowMeta().size(); i++) {
-      String columnName = getInputRowMeta().getValueMeta(i).getName();
-      for (String measureCol : originalMsrCols) {
-        if (measureCol.equalsIgnoreCase(columnName)) {
-          currMsrCol.add(columnName);
-          break;
-        }
-      }
-    }
-    String[] currentMsrCols = currMsrCol.toArray(new String[currMsrCol.size()]);
-    int[] indexs = new int[currentMsrCols.length];
-    for (int i = 0; i < currentMsrCols.length; i++) {
-      for (int j = 0; j < originalMsrCols.length; j++) {
-        if (currentMsrCols[i].equalsIgnoreCase(originalMsrCols[j])) {
-          indexs[i] = j;
-          break;
-        }
-      }
-    }
-    return indexs;
-  }
-  private int[] getMemberMappingOriginalIndexes() {
-    int[] memIndexes = new int[meta.dimLens.length + meta.msrs.length];
-    Arrays.fill(memIndexes, -1);
-    String actualColumnName = null;
-    List<String> allColumnsNamesFromCSV = new ArrayList<String>(10);
-    for (int i = 0; i < getInputRowMeta().size(); i++) {
-      allColumnsNamesFromCSV.add(getInputRowMeta().getValueMeta(i).getName());
-    }
-    List<String> currentColName = new ArrayList<String>(meta.actualDimArray.length);
-    List<String> duplicateNames = new ArrayList<String>(10);
-    for (int i = 0; i < getInputRowMeta().size(); i++) {
-      String columnName = getInputRowMeta().getValueMeta(i).getName();
-      String hier = meta.foreignKeyHierarchyMap.get(columnName);
-      String uniqueName = meta.getTableName() + '_' + columnName;
-      if (null != hier) {
-        if (hier.indexOf(CarbonCommonConstants.COMA_SPC_CHARACTER) > -1) {
-          getCurrenColForMultiHier(currentColName, hier);
-        } else {
-          String tableName = meta.getHierDimTableMap().get(hier);
-          String[] columns = meta.hierColumnMap.get(hier);
-          if (null != columns) {
-            for (String column : columns) {
-              //currentColumnNames[k++] = column;
-              currentColName.add(tableName + '_' + column);
-            }
-          }
-        }
-        if (isMeasureColumn(columnName, false)) {
-          currentColName.add(uniqueName);
-        }
-      } else // then it can be direct column name if not foreign key.
-      {
-        if (!meta.isAggregate()) {
-          currentColName.add(uniqueName);
-          //add to duplicate column list if it is a repeated column. it is required since the
-          // member mapping is 1 to 1 mapping
-          //of csv columns and schema columns. so if schema columns are repeated then we have to
-          // handle it in special way.
-          checkAndAddDuplicateCols(duplicateNames, uniqueName);
-        } else {
-          actualColumnName = meta.columnAndTableNameColumnMapForAggMap.get(columnName);
-          if (actualColumnName != null) {
-            currentColName.add(meta.columnAndTableNameColumnMapForAggMap.get(columnName));
-          } else {
-            currentColName.add(uniqueName);
-          }
-        }
-      }
-    }
-    //Add the duplicate columns at the end so that it won't create any problem with current mapping.
-    currentColName.addAll(duplicateNames);
-    String[] currentColNamesArray = currentColName.toArray(new String[currentColName.size()]);
-    // We will use same array for dimensions and measures
-    // First create the mapping for dimensions.
-    int dimIndex = 0;
-    Map<String, Boolean> counterMap = new HashMap<String, Boolean>(16);
-    // Setting dimPresent value in CSV order as we need it later
-    dimPresentCsvOrder = new boolean[meta.dimPresent.length];
-    // var used to set measures value (in next loop)
-    int toAddInIndex = 0;
-    int tmpIndex = 0;
-    for (int i = 0; i < currentColNamesArray.length; i++) {
-      if (isMeasureColumn(currentColNamesArray[i], true) && isNotInDims(currentColNamesArray[i])) {
-        continue;
-      }
-      int n = 0;
-      for (int j = 0; j < meta.actualDimArray.length; j++) {
-        if (currentColNamesArray[i].equalsIgnoreCase(meta.dimColNames[j])) {
-          String mapKey = currentColNamesArray[i] + "__" + j;
-          if (null == counterMap.get(mapKey)) {
-            dimPresentCsvOrder[tmpIndex] = meta.dimPresent[j];//CHECKSTYLE:ON
-            tmpIndex++;
-            counterMap.put(mapKey, true);
-            if (!meta.dimPresent[j]) {
-              dimIndex++;
-              continue;
-            }
-            memIndexes[dimIndex++] = n;
-            // Added one more value to memIndexes, increase counter
-            toAddInIndex++;
-            break;
-          } else {
-            n++;
-            continue;
-          }
-        }
-        if (meta.dimPresent[j]) {
-          n++;
-        }
-      }
-    }
-    for (int actDimLen = 0; actDimLen < meta.actualDimArray.length; actDimLen++) {
-      boolean found = false;
-      for (int csvHeadLen = 0; csvHeadLen < currentColNamesArray.length; csvHeadLen++) {
-        if (meta.dimColNames[actDimLen].equalsIgnoreCase(currentColNamesArray[csvHeadLen])) {
-          found = true;
-          break;
-        }
-      }
-      if (!found) {
-        dimIndex++;
-        toAddInIndex++;
-      }
-    }
-    // Now create the mapping of measures
-    // There may be case when measure column is present in the CSV file
-    // but not present in the schema , in that case we need to skip that column while
-    // sending the output to next step.
-    // Or Measure can be in any ordinal in the csv
-    int k = 0;
-    Map<String, Boolean> existsMap = new HashMap<String, Boolean>(16);
-    for (int i = 0; i < currentColNamesArray.length; i++) {
-      k = calculateMeasureOriginalIndexes(memIndexes, currentColNamesArray, dimIndex, toAddInIndex,
-          k, existsMap, i);
-    }
-    return memIndexes;
-  }
-  private void getCurrenColForMultiHier(List<String> currentColName, String hier) {
-    String[] splittedHiers = hier.split(CarbonCommonConstants.COMA_SPC_CHARACTER);
-    for (String hierName : splittedHiers) {
-      String tableName = meta.getHierDimTableMap().get(hierName);
-      String[] cols = meta.hierColumnMap.get(hierName);
-      if (null != cols) {
-        for (String column : cols) {
-          currentColName.add(tableName + '_' + column);
-        }
-      }
-    }
-  }
-  private void checkAndAddDuplicateCols(List<String> duplicateNames, String uniqueName) {
-    boolean exists = false;
-    for (int i = 0; i < meta.dimColNames.length; i++) {
-      if (uniqueName.equals(meta.dimColNames[i])) {
-        if (exists) {
-          duplicateNames.add(uniqueName);
-        }
-        exists = true;
-      }
-    }
-  }
-  /**
-   * calculateMeasureOriginalIndexes
-   *
-   * @param memIndexes
-   * @param currentColNamesArray
-   * @param dimIndex
-   * @param toAddInIndex
-   * @param k
-   * @param existsMap
-   * @param i
-   * @return
-   */
-  public int calculateMeasureOriginalIndexes(int[] memIndexes, String[] currentColNamesArray,
-      int dimIndex, int toAddInIndex, int k, Map<String, Boolean> existsMap, int i) {
-    for (int j = 0; j < meta.measureColumn.length; j++) {
-      if (currentColNamesArray[i]
-          .equalsIgnoreCase(meta.getTableName() + '_' + meta.measureColumn[j])) {
-        if (existsMap.get(meta.measureColumn[j]) == null) {
-          memIndexes[k + dimIndex] = toAddInIndex + j;
-          k++;
-          existsMap.put(meta.measureColumn[j], true);
-          break;
-        }
-      }
-    }
-    return k;
-  }
-  private boolean isNotInDims(String columnName) {
-    for (String dimName : meta.dimColNames) {
-      if (dimName.equalsIgnoreCase(columnName)) {
-        return false;
-      }
-    }
-    return true;
-  }
-  private void closeConnections() throws KettleException {
-    try {
-      for (Entry<String, Connection> entry : cons.entrySet()) {
-        entry.getValue().close();
-      }
-      cons.clear();
-    } catch (Exception ex) {
-      throw new KettleException(ex.getMessage(), ex);
-    }
-  }
-  /**
-   * According to the hierarchies,generate the varLengthKeyGenerator
-   *
-   * @param keyGenerators
-   * @param hirches
-   * @param dimLens
-   */
-  private void updateHierarchyKeyGenerators(Map<String, KeyGenerator> keyGenerators,
-      Map<String, int[]> hirches, int[] dimLens, String[] dimCols) {
-    //
-    String timeHierNameVal = "";
-    if (meta.getCarbonTime() == null || "".equals(meta.getCarbonTime())) {
-      timeHierNameVal = "";
-    } else {
-      String[] hies = meta.getCarbonTime().split(":");
-      timeHierNameVal = hies[1];
-    }
-    // Set<Entry<String,int[]>> hierSet = hirches.entrySet();
-    Iterator<Entry<String, int[]>> itr = hirches.entrySet().iterator();
-    while (itr.hasNext()) {
-      Entry<String, int[]> hieEntry =;
-      int[] a = hieEntry.getValue();
-      int[] lens = new int[a.length];
-      String name = hieEntry.getKey();
-      //
-      if (name.equalsIgnoreCase(timeHierNameVal)) {
-        for (int i = 0; i < a.length; i++) { //CHECKSTYLE:OFF
-          lens[i] = dimLens[a[i]];
-        } //CHECKSTYLE:ON
-      } else {
-        String[] columns = meta.hierColumnMap.get(name);
-        if (meta.getComplexTypes().get(columns[0]) != null) {
-          continue;
-        }
-        boolean isNoDictionary = false;
-        for (int i = 0; i < a.length; i++) {
-          if (null != meta.noDictionaryCols && isDimensionNoDictionary(meta.noDictionaryCols,
-              columns[i])) {
-            isNoDictionary = true;
-            break;
-          }
-        }
-        //if no dictionary column then do not populate the dim lens
-        if (isNoDictionary) {
-          continue;
-        }
-        //
-        for (int i = 0; i < a.length; i++) {
-          int newIndex = -1;
-          for (int j = 0; j < dimCols.length; j++) {
-            //
-            if (checkDimensionColName(dimCols[j], columns[i])) {
-              newIndex = j;
-              break;
-            }
-          } //CHECKSTYLE:OFF
-          lens[i] = dimLens[newIndex];
-        } //CHECKSTYLE:ON
-      }
-      //
-      KeyGenerator generator = KeyGeneratorFactory.getKeyGenerator(lens);
-      keyGenerators.put(name, generator);
-    }
-    Iterator<Entry<String, GenericDataType>> complexMap =
-        meta.getComplexTypes().entrySet().iterator();
-    while (complexMap.hasNext()) {
-      Entry<String, GenericDataType> complexDataType =;
-      List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
-      complexDataType.getValue().getAllPrimitiveChildren(primitiveTypes);
-      for (GenericDataType eachPrimitive : primitiveTypes) {
-        KeyGenerator generator = KeyGeneratorFactory.getKeyGenerator(new int[] { -1 });
-        keyGenerators.put(eachPrimitive.getName(), generator);
-      }
-    }
-  }
-  private boolean checkDimensionColName(String dimColName, String hierName) {
-    String[] tables = meta.getDimTableArray();
-    for (String table : tables) {
-      String hierWithTableName = table + '_' + hierName;
-      if (hierWithTableName.equalsIgnoreCase(dimColName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
-    meta = (CarbonCSVBasedSeqGenMeta) smi;
-    data = (CarbonCSVBasedSeqGenData) sdi;
-    return super.init(smi, sdi);
-  }
-  public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
-    /**
-     * Fortify Fix: FORWARD_NULL
-     * Changed to ||
-     * previously there was && but actully in case any one the object being null can through the
-     * nullpointer exception
-     *
-     */
-    if (null == smi || null == sdi) {
-      return;
-    }
-    meta = (CarbonCSVBasedSeqGenMeta) smi;
-    data = (CarbonCSVBasedSeqGenData) sdi;
-    CarbonCSVBasedDimSurrogateKeyGen surKeyGen = data.getSurrogateKeyGen();
-    try {
-      closeConnections();
-      if (null != surKeyGen) {
-        surKeyGen.setHierCache(null);
-        surKeyGen.setHierCacheReverse(null);
-        surKeyGen.setTimeDimCache(null);
-        surKeyGen.setMax(null);
-        surKeyGen.setTimDimMax(null);
-        surKeyGen.close();
-      }
-    } catch (Exception e) {
-      LOGGER.error(e);
-    } finally {
-      if (null != surKeyGen) {
-        clearDictionaryCache();
-        surKeyGen.setDictionaryCaches(null);
-      }
-    }
-    nrmlizedHierWriterMap = null;
-    data.clean();
-    super.dispose(smi, sdi);
-    meta = null;
-    data = null;
-  }
-  /**
-   * This method will clear the dictionary access count so that any unused
-   * column can be removed from the cache
-   */
-  private void clearDictionaryCache() {
-    Map<String, Dictionary> dictionaryCaches = surrogateKeyGen.getDictionaryCaches();
-    List<Dictionary> reverseDictionaries = new ArrayList<>(dictionaryCaches.values());
-    for (int i = 0; i < reverseDictionaries.size(); i++) {
-      Dictionary dictionary = reverseDictionaries.get(i);
-      dictionary.clear();
-    }
-  }
-  private String processnoDictionaryDim(int index, String dimensionValue, String dataType,
-      boolean isStringDataType, ByteBuffer[] out) {
-    if (!(isStringDataType)) {
-      if (null == DataTypeUtil
-          .normalizeIntAndLongValues(dimensionValue, DataTypeUtil.getDataType(dataType))) {
-        dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-      }
-    }
-    ByteBuffer buffer = ByteBuffer
-        .wrap(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-    buffer.rewind();
-    out[index] = buffer;
-    return dimensionValue;
-  }
-  /**
-   * @param NoDictionaryDims
-   * @param columnName
-   * @return true if the dimension is high cardinality.
-   */
-  private boolean isDimensionNoDictionary(String[] NoDictionaryDims, String columnName) {
-    for (String colName : NoDictionaryDims) {
-      if (colName
-          .equalsIgnoreCase(meta.getTableName() + CarbonCommonConstants.UNDERSCORE + columnName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  /**
-   * Observer class for thread execution
-   * In case of any failure we need stop all the running thread
-   */
-  private class ThreadStatusObserver {
-    /**
-     * Below method will be called if any thread fails during execution
-     *
-     * @param exception
-     */
-    public void notifyFailed(Throwable exception) throws RuntimeException {
-      exec.shutdownNow();
-      LOGGER.error(exception);
-      throw new RuntimeException(exception);
-    }
-  }
-  /**
-   * This method will fill the carbon measures
-   *
-   * @param measures
-   */
-  private void populateCarbonMeasures(String[] measures) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        meta.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + meta.getTableName());
-    meta.carbonMeasures = new CarbonMeasure[measures.length];
-    msrDataType = new DataType[measures.length];
-    for (int i = 0; i < measures.length; i++) {
-      CarbonMeasure carbonMeasure = carbonTable.getMeasureByName(meta.getTableName(), measures[i]);
-      msrDataType[i] = carbonMeasure.getDataType();
-      if (DataType.DECIMAL == carbonMeasure.getDataType()) {
-        meta.carbonMeasures[i] = carbonMeasure;
-      }
-    }
-  }
-  private CarbonDimension[] populateNameToCarbonDimensionMap() {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        meta.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + meta.getTableName());
-    List<CarbonDimension> dimensionsList = carbonTable.getDimensionByTableName(meta.getTableName());
-    CarbonDimension[] dimensionOrdinalToDimensionMapping =
-        new CarbonDimension[meta.getColumnSchemaDetailsWrapper().getColumnSchemaDetailsMap()
-            .size()];
-    List<CarbonDimension> dimListExcludingNoDictionaryColumn = dimensionsList;
-    if (null != meta.getNoDictionaryDims() && meta.getNoDictionaryDims().length() > 0) {
-      dimListExcludingNoDictionaryColumn =
-          new ArrayList<>(dimensionsList.size() - meta.noDictionaryCols.length);
-      for (CarbonDimension dimension : dimensionsList) {
-        // Here if dimension.getEncoder() lnly contains Encoding.INVERTED_INDEX, it
-        // means that NoDicColumn using InvertedIndex, so not put it into dic dims list.
-        if (!dimension.getEncoder().isEmpty() && !((1 == dimension.getEncoder().size()) &&
-            dimension.getEncoder().contains(Encoding.INVERTED_INDEX))) {
-          dimListExcludingNoDictionaryColumn.add(dimension);
-        }
-      }
-    }
-    for (int i = 0; i < dimListExcludingNoDictionaryColumn.size(); i++) {
-      CarbonDimension dimension = dimListExcludingNoDictionaryColumn.get(meta.memberMapping[i]);
-      if (dimension.isComplex()) {
-        populateComplexDimension(dimensionOrdinalToDimensionMapping, dimension);
-      } else {
-        dimensionOrdinalToDimensionMapping[meta.memberMapping[i]] = dimension;
-      }
-    }
-    return dimensionOrdinalToDimensionMapping;
-  }
-  private void populateComplexDimension(CarbonDimension[] dimensionOrdinalToDimensionMapping,
-      CarbonDimension dimension) {
-    List<CarbonDimension> listOfChildDimensions = dimension.getListOfChildDimensions();
-    for (CarbonDimension childDimension : listOfChildDimensions) {
-      if (childDimension.isComplex()) {
-        populateComplexDimension(dimensionOrdinalToDimensionMapping, childDimension);
-      } else {
-        dimensionOrdinalToDimensionMapping[childDimension.getOrdinal()] = childDimension;
-      }
-    }
-  }