You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ravipesala <gi...@git.apache.org> on 2017/03/01 16:30:41 UTC

[GitHub] incubator-carbondata pull request #620: [WIP]Added batch sort to improve the...

GitHub user ravipesala opened a pull request:

    https://github.com/apache/incubator-carbondata/pull/620

    [WIP]Added batch sort to improve the loading performance

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata batch-sort

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-carbondata/pull/620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #620
    
----
commit e440bce45913ea3a643ac647d245b130f73db3dd
Author: ravipesala <ra...@gmail.com>
Date:   2017-03-01T16:27:32Z

    Added batch sort to improve the loading performance

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742] Added batch sort to improv...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105907719
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java ---
    @@ -58,11 +59,17 @@ public void initialize() throws IOException {
         boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
                 CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
    +    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
    +        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
    +            CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
         if (offheapsort) {
    --- End diff --
    
    This should be put under else clause of `if (batchSort)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105873397
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -352,9 +352,9 @@ public String getCarbonDataDirectoryPath(String partitionId, String segmentId) {
        * @return gets data file name only with out path
        */
       public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
    -      String factUpdateTimeStamp) {
    -    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
    -        + factUpdateTimeStamp + CARBON_DATA_EXT;
    +      int taskExtension, String factUpdateTimeStamp) {
    +    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "_" + taskExtension + "-" + bucketNumber
    --- End diff --
    
    I did not get, Can you give more information on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106135534
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java ---
    @@ -58,11 +59,17 @@ public void initialize() throws IOException {
         boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
                 CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
    +    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
    +        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
    +            CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
         if (offheapsort) {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106133451
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    +        int[] dimsArray = row.getIntArray(dimsArrayIndex);
    +        for (int i = 0; i < highCardExcludedRows.length; i++) {
    --- End diff --
    
    This copy is not required. :) I removed it 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106128671
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    --- End diff --
    
    ok, removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106128868
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105898878
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    --- End diff --
    
    mentioning Batch Sort


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742]Added batch sort to improve...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    For one batch, one file/Btree is created. Since I have given the batch size as 450MB it can do batch process for every 450 MB of collected data.  If I give my batch size as 900MB then it creates only 7 files.  The files/Btrees will get reduce as you increase the batch size. 
    
    Yes, even I have noticed that some times spark can do process well if the blocks are small. That might be the reason Q3 and Q8 is faster.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106133491
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106134851
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    +        int[] dimsArray = row.getIntArray(dimsArrayIndex);
    +        for (int i = 0; i < highCardExcludedRows.length; i++) {
    +          highCardExcludedRows[i] = dimsArray[i];
    +        }
    +
    +        outputRow[outputRow.length - 1] =
    --- End diff --
    
    yes, comment is added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106134073
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    --- End diff --
    
    copy not required, removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106128302
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    --- End diff --
    
    if it already exists then we do nothing and continue loading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742] Added batch sort to improv...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1225/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Failed  with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/991/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/998/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106138538
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -236,9 +236,9 @@ public String getTableUpdateStatusFilePath() {
        * @return absolute path of data file stored in carbon data format
        */
       public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
    -      Integer taskNo, int bucketNumber, String factUpdateTimeStamp) {
    +      Integer taskNo, int taskExtension, int bucketNumber, String factUpdateTimeStamp) {
    --- End diff --
    
    Ok, updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106125107
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---
    @@ -52,10 +53,15 @@
     
       public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
           CarbonIterator[] inputIterators) throws Exception {
    +    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
    +        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
    --- End diff --
    
    I guess it would be better to add in load option. I will raise jira for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Failed  with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/987/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/999/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105898310
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    +        int[] dimsArray = row.getIntArray(dimsArrayIndex);
    +        for (int i = 0; i < highCardExcludedRows.length; i++) {
    +          highCardExcludedRows[i] = dimsArray[i];
    +        }
    +
    +        outputRow[outputRow.length - 1] =
    +            segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows);
    +        dataHandler.addDataToStore(outputRow);
    +      }
    +    } catch (Exception e) {
    --- End diff --
    
    `execute()` is catching all exception, so no need to catch here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105896498
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    --- End diff --
    
    Can be moved to line 109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742]Added batch sort to ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r104385171
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---
    @@ -0,0 +1,270 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.sort.impl;
    +
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.newflow.row.CarbonSortBatch;
    +import org.apache.carbondata.processing.newflow.sort.Sorter;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
    +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
    +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +
    +/**
    + * It parallely reads data from array of iterates and do merge sort.
    + * It sorts data in batches and send to the next step.
    + */
    +public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
    --- End diff --
    
    Yes we do sort in-memory, it sorts the data chunk by chunk (default size 64 MB) and kept them in memory, once the batch memory reaches then it starts merge sort and gives to the data writer. This approach is faster than sort the big batch of records once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105839493
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java ---
    @@ -34,6 +35,9 @@
     
       public static final int DOUBLE_ARRAY_OFFSET;
     
    +  public static final boolean LITTLEENDIAN =
    --- End diff --
    
    rename to `isLittleEndian`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Failed  with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/995/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Failed  with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/990/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105908576
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -236,9 +236,9 @@ public String getTableUpdateStatusFilePath() {
        * @return absolute path of data file stored in carbon data format
        */
       public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
    -      Integer taskNo, int bucketNumber, String factUpdateTimeStamp) {
    +      Integer taskNo, int taskExtension, int bucketNumber, String factUpdateTimeStamp) {
    --- End diff --
    
    How about rename `taskExtension` to `batchNo`. It is the batch number of the batch sort, right? And in the file name, give a prefix like `part-0-batchNo1-xxx`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105838661
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---
    @@ -1149,6 +1149,19 @@
     
       public static final String USE_KETTLE_DEFAULT = "false";
     
    +  /**
    +   * Sorts the data in batches and writes the batch data to store with index file.
    +   */
    +  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
    +
    +  public static final String LOAD_USE_BATCH_SORT_DEFAULT = "true";
    --- End diff --
    
    Better to make it `false` as of now, and revisit it after checking the performance and loading and reading. 
    And please add comment mentioning something like "if fast loading is favored, set it to true. If fast query performance is favored, set it to false"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Failed  with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/989/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105907908
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java ---
    @@ -74,7 +81,6 @@ public void initialize() throws IOException {
       public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
         final Iterator<CarbonRowBatch>[] iterators = child.execute();
         Iterator<CarbonRowBatch>[] sortedIterators = sorter.sort(iterators);
    -    child.close();
    --- End diff --
    
    Is this a bug in old code? Why is it removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105906694
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    +        int[] dimsArray = row.getIntArray(dimsArrayIndex);
    +        for (int i = 0; i < highCardExcludedRows.length; i++) {
    +          highCardExcludedRows[i] = dimsArray[i];
    +        }
    +
    +        outputRow[outputRow.length - 1] =
    --- End diff --
    
    What is the data arrangement inside outputRow? It seems like first putting measure columns, then no dictionary columns and last element is dictionary keys in byte array? Can you comment this outputRow format in the function header


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105905090
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    --- End diff --
    
    Can check the length whether it is 0 before creating and copying data


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105909540
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---
    @@ -52,10 +53,15 @@
     
       public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
           CarbonIterator[] inputIterators) throws Exception {
    +    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
    +        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
    --- End diff --
    
    Yes, is it possible to add it in load option in this PR? If not, in create table is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742]Added batch sort to improve...

Posted by chenliang613 <gi...@git.apache.org>.
Github user chenliang613 commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    please change the title as per the format: [CARBONDATA-issue number>] Title of the pull request (need to add a blank)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742]Added batch sort to improve...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Data size -> 100 million records
    **DDL and Queries for test**
    CREATE TABLE perftesta (c1 string,c2 string,c3 string,c4 string,c5 string,c6 bigint,c7 double,c8 int,c9 double,c10 double) STORED BY 'carbondata';
    
    Q1 -> select count(*) from perftesta;
    Q2 -> SELECT c3, c4, sum(c8) FROM perftesta WHERE c1 = 'P1_24521' GROUP BY c3, c4;
    Q3 -> SELECT c2, c5, count(distinct c1), sum(c7) FROM perftesta WHERE c4="P4_4" and c5="P5_7" and c8>4 GROUP BY c2, c5;
    Q4 -> SELECT c2, c5, count(distinct c1), sum(c7) FROM perftesta WHERE c4="P4_4" and c5="P5_7" GROUP BY c2, c5;
    Q5 -> SELECT c4 FROM perftesta WHERE c1="P1_24521";
    Q6 -> SELECT * FROM perftesta WHERE c2="P2_43";
    Q7 -> SELECT sum(c7), sum(c8), avg(c9), max(c10) FROM perftesta;
    Q8 -> SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM perftesta WHERE c2="P2_75" and c6<5;
    Q9 -> SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM perftesta WHERE c2="P2_75";
    Q10 -> SELECT count(c1),count(c2),count(c3),count(c4),count(c5),count(c6),count(c7),count(c8),count(c9),count(c10) FROM perftesta;
    
    **With Batch Sort**
    Load with inmemory size 1GB(with unsafe sort) so batch size will be ~450MB -->   Time : 324 seconds
    Total blocks created 14 files with each 105MB
    
    Query(first reading, second reading)
    Q1 (6.577, 3.404)
    Q2 (3.414, 1.639)
    Q3 (8.552, 7.572)
    Q4 (5.033, 3.875)
    Q5 (0.616, 0.456)
    Q6 (7.978, 7.682)
    Q7 (3.985, 2.909)
    Q8 (8.93, 8.697)
    Q9 (3.606, 3.305)
    Q10 (8.51, 8.367)
    
    **With complete sort (old flow)**
    Load with inmemory size 1GB with unsafe sort -->   Time : 430 seconds
    Total blocks created 2 files with 920MB and 560MB
    
    Query(first reading, second reading)
    Q1 (7.473,2.254)
    Q2 (2.635, 0.678)
    Q3 (11.411, 9.322)
    Q4 (4.422, 3.883)
    Q5 (0.332,0.22)
    Q6 (8.580, 8.187)
    Q7 (4.364, 3.617)
    Q8 (12.033, 12.138)
    Q9 (3.622, 3.695)
    Q10 (8.39, 8.941)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105873553
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---
    @@ -52,10 +53,15 @@
     
       public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
           CarbonIterator[] inputIterators) throws Exception {
    +    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
    +        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
    --- End diff --
    
    You mean we should add to tableproperties while creating the table? Or it should be dataloading option inside `LOAD` command?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742]Added batch sort to improve...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    With Batch Sort:
    How many batch is processed within 14 files? Basically I wanted to know how many more B tree is created comparing to With complete sort approach.
    It is a bit strange that Q3 and Q8 is faster with Batch Sort.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105909192
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -352,9 +352,9 @@ public String getCarbonDataDirectoryPath(String partitionId, String segmentId) {
        * @return gets data file name only with out path
        */
       public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
    -      String factUpdateTimeStamp) {
    -    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
    -        + factUpdateTimeStamp + CARBON_DATA_EXT;
    +      int taskExtension, String factUpdateTimeStamp) {
    +    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "_" + taskExtension + "-" + bucketNumber
    --- End diff --
    
    I added another comment at last:
    
    > How about rename taskExtension to batchNo. It is the batch number of the batch sort, right? And in the file name, give a prefix like part-0-batchNo1-xxx



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/994/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742] Added batch sort to improv...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    I checked again, and it seems there is no new test case added for this feature. Please add test case for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105896311
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    --- End diff --
    
    It seems no function is throwing this exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105838407
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---
    @@ -1149,6 +1149,19 @@
     
       public static final String USE_KETTLE_DEFAULT = "false";
     
    +  /**
    +   * Sorts the data in batches and writes the batch data to store with index file.
    +   */
    +  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
    +
    +  public static final String LOAD_USE_BATCH_SORT_DEFAULT = "true";
    +
    +  /**
    +   * Size of batch data to keep in memory, as a thumb rule it supposed
    +   * to be less than 45% of sort.inmemory.size.inmb otherwise it may spill intermediate data to disk
    +   */
    +  public static final String LOAD_BATCH_SORT_SIZE_INMB = "carbon.load.batch.sort.size.inmb";
    --- End diff --
    
    I think it is better to move it up nearby  where `IN_MEMORY_FOR_SORT_DATA_IN_MB` is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105854283
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---
    @@ -1149,6 +1149,19 @@
     
       public static final String USE_KETTLE_DEFAULT = "false";
     
    +  /**
    +   * Sorts the data in batches and writes the batch data to store with index file.
    +   */
    +  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
    +
    +  public static final String LOAD_USE_BATCH_SORT_DEFAULT = "true";
    +
    +  /**
    +   * Size of batch data to keep in memory, as a thumb rule it supposed
    +   * to be less than 45% of sort.inmemory.size.inmb otherwise it may spill intermediate data to disk
    +   */
    +  public static final String LOAD_BATCH_SORT_SIZE_INMB = "carbon.load.batch.sort.size.inmb";
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105898572
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    +        int[] dimsArray = row.getIntArray(dimsArrayIndex);
    +        for (int i = 0; i < highCardExcludedRows.length; i++) {
    --- End diff --
    
    Can be replaced by System.arraycopy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106136281
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java ---
    @@ -74,7 +81,6 @@ public void initialize() throws IOException {
       public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
         final Iterator<CarbonRowBatch>[] iterators = child.execute();
         Iterator<CarbonRowBatch>[] sortedIterators = sorter.sort(iterators);
    -    child.close();
    --- End diff --
    
    It is removed as part of refactoring of code to support batch sort. Earlier it was needed to get the cardinality of all dimensions. Now there is a new interface `DictionaryCardinalityFinder` has been added to figure out the cardinalities.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [CARBONDATA-742] Added batch sort to improv...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    @ravipesala please rebase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106127463
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    --- End diff --
    
    Actually it is not always get the data from in-memory sorted files. It actually gets the data batch of sort files(it could be in-memory/disk). It always depends on batch size and memory availability. If the memory configured is 1GB and batch size configured is 2GB then after it reaches the memory limit it flushes to disk.
    I will update the comment as per the above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106129176
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException(e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +      }
    +    }
    +  }
    +
    +  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
    +      SegmentProperties segmentProperties)
    +      throws CarbonDataLoadingException {
    +    int batchSize = 0;
    +    try {
    +      while (batch.hasNext()) {
    +        CarbonRow row = batch.next();
    +        batchSize++;
    +        Object[] outputRow;
    +        // adding one for the high cardinality dims byte array.
    +        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
    +          outputRow = new Object[measureCount + 1 + 1];
    +        } else {
    +          outputRow = new Object[measureCount + 1];
    +        }
    +
    +        int l = 0;
    +        int index = 0;
    +        Object[] measures = row.getObjectArray(measureIndex);
    +        for (int i = 0; i < measureCount; i++) {
    +          outputRow[l++] = measures[index++];
    +        }
    +        outputRow[l] = row.getObject(noDimByteArrayIndex);
    +
    +        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
    +        int[] dimsArray = row.getIntArray(dimsArrayIndex);
    +        for (int i = 0; i < highCardExcludedRows.length; i++) {
    +          highCardExcludedRows[i] = dimsArray[i];
    +        }
    +
    +        outputRow[outputRow.length - 1] =
    +            segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows);
    +        dataHandler.addDataToStore(outputRow);
    +      }
    +    } catch (Exception e) {
    --- End diff --
    
    ok, removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105851374
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---
    @@ -52,10 +53,15 @@
     
       public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
           CarbonIterator[] inputIterators) throws Exception {
    +    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
    +        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
    --- End diff --
    
    Should this configuration be a table level option? If so, user can control it when creating table instead of relying global configuration


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105854781
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java ---
    @@ -34,6 +35,9 @@
     
       public static final int DOUBLE_ARRAY_OFFSET;
     
    +  public static final boolean LITTLEENDIAN =
    --- End diff --
    
    It is `static final` so better to keep in caps,  I will rename to `ISLITTLEENDIAN`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105895986
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    --- End diff --
    
    How about if the dir already exist?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r106133618
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105895379
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    --- End diff --
    
    suggest mentioning `from in-memory sorted files `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    Build Failed  with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/988/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105854421
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---
    @@ -1149,6 +1149,19 @@
     
       public static final String USE_KETTLE_DEFAULT = "false";
     
    +  /**
    +   * Sorts the data in batches and writes the batch data to store with index file.
    +   */
    +  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
    +
    +  public static final String LOAD_USE_BATCH_SORT_DEFAULT = "true";
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105842635
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -352,9 +352,9 @@ public String getCarbonDataDirectoryPath(String partitionId, String segmentId) {
        * @return gets data file name only with out path
        */
       public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
    -      String factUpdateTimeStamp) {
    -    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
    -        + factUpdateTimeStamp + CARBON_DATA_EXT;
    +      int taskExtension, String factUpdateTimeStamp) {
    +    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "_" + taskExtension + "-" + bucketNumber
    --- End diff --
    
    Should we give some prefix to the taskExtension to make it more clear? I feel it is too many number


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742]Added batch sort to ...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r104316373
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---
    @@ -0,0 +1,270 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.sort.impl;
    +
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.newflow.row.CarbonSortBatch;
    +import org.apache.carbondata.processing.newflow.sort.Sorter;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
    +import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
    +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
    +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +
    +/**
    + * It parallely reads data from array of iterates and do merge sort.
    + * It sorts data in batches and send to the next step.
    + */
    +public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
    --- End diff --
    
    Is this sorter still doing merge? I though it should do in-memory sort only


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-carbondata/pull/620


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata issue #620: [WIP]Added batch sort to improve the loadin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/620
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #620: [CARBONDATA-742] Added batch sort to...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/620#discussion_r105899292
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.newflow.steps;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.IgnoreDictionary;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
    +import org.apache.carbondata.processing.store.CarbonFactHandler;
    +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
    +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from sorted files which are generated in previous sort step.
    + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
    + */
    +public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
    +
    +  private int noDictionaryCount;
    +
    +  private int complexDimensionCount;
    +
    +  private int measureCount;
    +
    +  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
    +
    +  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
    +
    +  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
    +
    +  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child) {
    +    super(configuration, child);
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return child.getOutput();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    child.initialize();
    +  }
    +
    +  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
    +    String storeLocation = CarbonDataProcessorUtil
    +        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
    +            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
    +            configuration.getSegmentId() + "", false);
    +    new File(storeLocation).mkdirs();
    +    return storeLocation;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
    +    Iterator<CarbonRowBatch>[] iterators = child.execute();
    +    CarbonTableIdentifier tableIdentifier =
    +        configuration.getTableIdentifier().getCarbonTableIdentifier();
    +    String tableName = tableIdentifier.getTableName();
    +    try {
    +      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
    +          .createCarbonFactDataHandlerModel(configuration,
    +              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
    +      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
    +      complexDimensionCount = configuration.getComplexDimensionCount();
    +      measureCount = dataHandlerModel.getMeasureCount();
    +
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +              System.currentTimeMillis());
    +      int i = 0;
    +      for (Iterator<CarbonRowBatch> iterator : iterators) {
    +        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
    +        CarbonFactHandler dataHandler = null;
    +        int k = 0;
    +        while (iterator.hasNext()) {
    +          CarbonRowBatch next = iterator.next();
    +          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
    +              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
    +          dataHandler = CarbonFactHandlerFactory
    +              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
    +          dataHandler.initialise();
    +          processBatch(next, dataHandler, model.getSegmentProperties());
    +          finish(tableName, dataHandler);
    +        }
    +        i++;
    +      }
    +
    +    } catch (CarbonDataWriterException e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException(
    +          "Error while initializing data handler : " + e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
    +      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
    +    }
    +    return null;
    +  }
    +
    +  @Override protected String getStepName() {
    +    return "Data Writer";
    +  }
    +
    +  private void finish(String tableName, CarbonFactHandler dataHandler) {
    +    try {
    +      dataHandler.finish();
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
    +    }
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
    +    processingComplete(dataHandler);
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
    +            System.currentTimeMillis());
    +    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
    +        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
    +  }
    +
    +  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
    +    if (null != dataHandler) {
    +      try {
    +        dataHandler.closeHandler();
    +      } catch (CarbonDataWriterException e) {
    --- End diff --
    
    These two catch are the same, keep one is enough


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---