You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by ravipesala <gi...@git.apache.org> on 2016/10/14 17:13:25 UTC

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

GitHub user ravipesala opened a pull request:

    https://github.com/apache/incubator-carbondata/pull/240

    [CARBONDATA-298]Added InputProcessorStep to read data from csv reader iterator.

    Add InputProcessorStep which should iterate recordreader of csv input and parse the data as per the data type.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata input-processor-step

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-carbondata/pull/240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #240
    
----
commit 96c46d2d31c2f80b89ff755c3683c08b24eca042
Author: ravipesala <ra...@gmail.com>
Date:   2016-10-14T17:09:58Z

    Added InputProcessorStep to read data from csv reader iterator.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84485160
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java ---
    @@ -33,4 +33,8 @@
       public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
     
       public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
    +
    --- End diff --
    
    ok. we should separate options expose to user. let's do it in future PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r83506371
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void intialize() throws CarbonDataLoadingException {
    --- End diff --
    
    typo, initialize


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84476341
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
    +
    +    private List<Iterator<Object[]>> inputIterators;
    +
    +    private GenericParser[] genericParsers;
    +
    +    private Iterator<Object[]> currentIterator;
    +
    +    private int counter;
    +
    +    private int batchSize;
    +
    +    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
    +        GenericParser[] genericParsers, int batchSize) {
    --- End diff --
    
    It is consistent as generic parsers are  created from `DataField[]` returned by `getOutput`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84237159
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
    +
    +    private List<Iterator<Object[]>> inputIterators;
    +
    +    private GenericParser[] genericParsers;
    +
    +    private Iterator<Object[]> currentIterator;
    +
    +    private int counter;
    +
    +    private int batchSize;
    +
    +    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
    +        GenericParser[] genericParsers, int batchSize) {
    +      this.inputIterators = inputIterators;
    +      this.genericParsers = genericParsers;
    +      this.batchSize = batchSize;
    +      currentIterator = inputIterators.get(counter++);
    +    }
    +
    +    @Override public boolean hasNext() {
    +      return internalHasNext();
    +    }
    +
    +    private boolean internalHasNext() {
    +      boolean hasNext = currentIterator.hasNext();
    +      if (!hasNext) {
    +        if (counter < inputIterators.size()) {
    +          currentIterator = inputIterators.get(counter++);
    +        }
    +        hasNext = internalHasNext();
    +      }
    +      return hasNext;
    +    }
    +
    +    @Override public CarbonRowBatch next() {
    +      CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
    +      int count = 0;
    +      while (internalHasNext() && count < batchSize) {
    +        Object[] row = currentIterator.next();
    +        for (int i = 0; i < row.length; i++) {
    +          row[i] = genericParsers[i].parse(row[i].toString());
    --- End diff --
    
    This will involve calling many virtual function call, can we make it a RowParser and RowParserFactory, then later we can add code generation for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84236755
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
    +
    +    private List<Iterator<Object[]>> inputIterators;
    +
    +    private GenericParser[] genericParsers;
    +
    +    private Iterator<Object[]> currentIterator;
    +
    +    private int counter;
    +
    +    private int batchSize;
    +
    +    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
    --- End diff --
    
    please add description comment to this class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84228406
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java ---
    @@ -0,0 +1,22 @@
    +package org.apache.carbondata.processing.newflow.parser;
    +
    +/**
    + * Parse the data according to implementation, The implementation classes can be struct, array or
    + * map datatypes.
    + */
    +public interface GenericParser<E> {
    +
    +  /**
    +   * Parse the data as per the delimiter
    +   * @param data
    +   * @return
    +   */
    +  E parse(String data);
    +
    +  /**
    +   * Children of the parser.
    +   * @param parser
    +   */
    +  void addChildren(GenericParser parser);
    --- End diff --
    
    This can be the behavior of parser for complex type only, so I think you can create a sub-interface for `ComplexTypePaser` extending `GenericParser`. Or even better, you can remove it and add it in `ArrayParserImpl` and `StructParserImpl` only


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84234651
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
    +
    +    private List<Iterator<Object[]>> inputIterators;
    +
    +    private GenericParser[] genericParsers;
    +
    +    private Iterator<Object[]> currentIterator;
    +
    +    private int counter;
    +
    +    private int batchSize;
    +
    +    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
    +        GenericParser[] genericParsers, int batchSize) {
    +      this.inputIterators = inputIterators;
    +      this.genericParsers = genericParsers;
    +      this.batchSize = batchSize;
    +      currentIterator = inputIterators.get(counter++);
    --- End diff --
    
    `counter` is used without initializing, and its meaning is not so clear, can we have a better approach or add comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84231719
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    --- End diff --
    
    I think these functions can be shared across the project, so considering move them into CarbonProperties directly, like `CarbonProperties.numberOfCores()` and `CarbonProperties.batchSize()` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475781
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java ---
    @@ -0,0 +1,19 @@
    +package org.apache.carbondata.processing.newflow.complexobjects;
    +
    +public class StructObject {
    +
    +  private Object[] data;
    +
    +  public StructObject(Object[] data) {
    +    this.data = data;
    +  }
    +
    +  public Object[] getData() {
    +    return data;
    +  }
    +
    +  public void setData(Object[] data) {
    --- End diff --
    
    I guess `setData` makes more sense as complete data of struct passed once instead one by one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475433
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java ---
    @@ -0,0 +1,40 @@
    +package org.apache.carbondata.processing.newflow.parser;
    +
    +import java.util.List;
    +
    +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.processing.newflow.parser.impl.ArrayParserImpl;
    +import org.apache.carbondata.processing.newflow.parser.impl.PrimitiveParserImpl;
    +import org.apache.carbondata.processing.newflow.parser.impl.StructParserImpl;
    +
    +public class CarbonParserFactory {
    +
    +  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters) {
    +    return createParser(carbonColumn, complexDelimiters, 0);
    +  }
    +
    +  private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
    +      int counter) {
    +    switch (carbonColumn.getDataType()) {
    +      case ARRAY:
    +        List<CarbonDimension> listOfChildDimensions =
    +            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
    +        ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[counter]);
    +        for (CarbonDimension dimension : listOfChildDimensions) {
    +          arrayParser.addChildren(createParser(dimension, complexDelimiters, counter + 1));
    +        }
    +        return arrayParser;
    +      case STRUCT:
    +        List<CarbonDimension> dimensions =
    +            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
    +        StructParserImpl parser = new StructParserImpl(complexDelimiters[counter]);
    +        for (CarbonDimension dimension : dimensions) {
    +          parser.addChildren(createParser(dimension, complexDelimiters, counter + 1));
    +        }
    +        return parser;
    +      default:
    --- End diff --
    
    Map is not supported yet so thrown unsupported exception for Map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84234179
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    --- End diff --
    
    In this case, genericParser should be thread-safe, please add comment in `GenericParser` interface and ensure it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84229327
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java ---
    @@ -0,0 +1,19 @@
    +package org.apache.carbondata.processing.newflow.complexobjects;
    +
    +public class StructObject {
    +
    +  private Object[] data;
    +
    +  public StructObject(Object[] data) {
    +    this.data = data;
    +  }
    +
    +  public Object[] getData() {
    +    return data;
    +  }
    +
    +  public void setData(Object[] data) {
    --- End diff --
    
    instead of just setting data, I think it is better to add function like `addMember(Object member)`, so that in `StructParserImpl` can make use of `addMember`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475299
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java ---
    @@ -114,11 +114,15 @@ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
       /**
        * It is called when task is called successfully.
        */
    -  public abstract void finish();
    +  public void finish() {
    +    // implementation classes can override to update the status.
    +  }
     
       /**
        * Closing of resources after step execution can be done here.
        */
    -  public abstract void close();
    +  public void close() {
    +    // implementation classes can override to close the resources if any available.
    --- End diff --
    
    Removed `finish` method and kept only `close`, so for all cases `close` need to be called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475851
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java ---
    @@ -33,4 +33,8 @@
       public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
     
       public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
    +
    --- End diff --
    
    May be we can refactor this later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-carbondata/pull/240


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475566
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java ---
    @@ -0,0 +1,22 @@
    +package org.apache.carbondata.processing.newflow.parser;
    +
    +/**
    + * Parse the data according to implementation, The implementation classes can be struct, array or
    + * map datatypes.
    + */
    +public interface GenericParser<E> {
    +
    +  /**
    +   * Parse the data as per the delimiter
    +   * @param data
    +   * @return
    +   */
    +  E parse(String data);
    +
    +  /**
    +   * Children of the parser.
    +   * @param parser
    +   */
    +  void addChildren(GenericParser parser);
    --- End diff --
    
    Yes, added new interface ComplexParser that extends GenericParser.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84227656
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java ---
    @@ -0,0 +1,40 @@
    +package org.apache.carbondata.processing.newflow.parser;
    +
    +import java.util.List;
    +
    +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.processing.newflow.parser.impl.ArrayParserImpl;
    +import org.apache.carbondata.processing.newflow.parser.impl.PrimitiveParserImpl;
    +import org.apache.carbondata.processing.newflow.parser.impl.StructParserImpl;
    +
    +public class CarbonParserFactory {
    +
    +  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters) {
    +    return createParser(carbonColumn, complexDelimiters, 0);
    +  }
    +
    +  private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
    --- End diff --
    
    please give function description to these two `createParser` functions, it is not so clear that what the `counter` it is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84229694
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java ---
    @@ -33,4 +33,8 @@
       public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
     
       public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
    +
    --- End diff --
    
    can you rename this class to a more meaningful name like `DataLoadOptions`. It will be exposed to user as data load options, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84230036
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    --- End diff --
    
    move `configuration` to next line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84226906
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java ---
    @@ -114,11 +114,15 @@ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
       /**
        * It is called when task is called successfully.
        */
    -  public abstract void finish();
    +  public void finish() {
    +    // implementation classes can override to update the status.
    +  }
     
       /**
        * Closing of resources after step execution can be done here.
        */
    -  public abstract void close();
    +  public void close() {
    +    // implementation classes can override to close the resources if any available.
    --- End diff --
    
    The comment is not so clear, is this called in case of failure?
    in `finish()`, resources also need to be released, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84233633
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    --- End diff --
    
     put `@Override` in previous line as the coding standard


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84476461
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
    +
    +    private List<Iterator<Object[]>> inputIterators;
    +
    +    private GenericParser[] genericParsers;
    +
    +    private Iterator<Object[]> currentIterator;
    +
    +    private int counter;
    +
    +    private int batchSize;
    +
    +    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
    +        GenericParser[] genericParsers, int batchSize) {
    +      this.inputIterators = inputIterators;
    +      this.genericParsers = genericParsers;
    +      this.batchSize = batchSize;
    +      currentIterator = inputIterators.get(counter++);
    +    }
    +
    +    @Override public boolean hasNext() {
    +      return internalHasNext();
    +    }
    +
    +    private boolean internalHasNext() {
    +      boolean hasNext = currentIterator.hasNext();
    +      if (!hasNext) {
    +        if (counter < inputIterators.size()) {
    +          currentIterator = inputIterators.get(counter++);
    +        }
    +        hasNext = internalHasNext();
    +      }
    +      return hasNext;
    +    }
    +
    +    @Override public CarbonRowBatch next() {
    +      CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
    +      int count = 0;
    +      while (internalHasNext() && count < batchSize) {
    +        Object[] row = currentIterator.next();
    +        for (int i = 0; i < row.length; i++) {
    +          row[i] = genericParsers[i].parse(row[i].toString());
    --- End diff --
    
    Yes, it is added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84435567
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java ---
    @@ -0,0 +1,13 @@
    +package org.apache.carbondata.processing.newflow.parser;
    --- End diff --
    
    please add license header for all new files. there are also other files without header


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84476023
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    --- End diff --
    
    Yes `GenericParser` is thread safe, comment is added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84227827
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java ---
    @@ -0,0 +1,40 @@
    +package org.apache.carbondata.processing.newflow.parser;
    +
    +import java.util.List;
    +
    +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.processing.newflow.parser.impl.ArrayParserImpl;
    +import org.apache.carbondata.processing.newflow.parser.impl.PrimitiveParserImpl;
    +import org.apache.carbondata.processing.newflow.parser.impl.StructParserImpl;
    +
    +public class CarbonParserFactory {
    +
    +  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters) {
    +    return createParser(carbonColumn, complexDelimiters, 0);
    +  }
    +
    +  private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
    +      int counter) {
    +    switch (carbonColumn.getDataType()) {
    +      case ARRAY:
    +        List<CarbonDimension> listOfChildDimensions =
    +            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
    +        ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[counter]);
    +        for (CarbonDimension dimension : listOfChildDimensions) {
    +          arrayParser.addChildren(createParser(dimension, complexDelimiters, counter + 1));
    +        }
    +        return arrayParser;
    +      case STRUCT:
    +        List<CarbonDimension> dimensions =
    +            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
    +        StructParserImpl parser = new StructParserImpl(complexDelimiters[counter]);
    +        for (CarbonDimension dimension : dimensions) {
    +          parser.addChildren(createParser(dimension, complexDelimiters, counter + 1));
    +        }
    +        return parser;
    +      default:
    --- End diff --
    
    DataType can be Map, I think it is added in DataType already, so we should add a case branch for Map, otherwise logic is not correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84233734
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    --- End diff --
    
    please add some comment in this function to describe the parallelism


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84232050
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    --- End diff --
    
    suggest to change to `CarbonProperties.loadProcessBatchSize()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84476407
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
    +
    +    private List<Iterator<Object[]>> inputIterators;
    +
    +    private GenericParser[] genericParsers;
    +
    +    private Iterator<Object[]> currentIterator;
    +
    +    private int counter;
    +
    +    private int batchSize;
    +
    +    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
    +        GenericParser[] genericParsers, int batchSize) {
    +      this.inputIterators = inputIterators;
    +      this.genericParsers = genericParsers;
    +      this.batchSize = batchSize;
    +      currentIterator = inputIterators.get(counter++);
    --- End diff --
    
    yes, it is corrected


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84227253
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java ---
    @@ -0,0 +1,18 @@
    +package org.apache.carbondata.processing.newflow.complexobjects;
    --- End diff --
    
    please add license header for all files


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84232604
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    --- End diff --
    
    please use a different name (suggest parallelism), the meaning of this variable is `min(inputIterator.size(), numberOfCores)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84233387
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    --- End diff --
    
    remove empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84236268
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    +    }
    +    List<Iterator<Object[]>>[] iterators = new List[numberOfCores];
    +    for (int i = 0; i < numberOfCores; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +
    +    for (int i = 0; i < inputIterators.size(); i++) {
    +      iterators[i % numberOfCores].add(inputIterators.get(i));
    +
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
    +
    +    private List<Iterator<Object[]>> inputIterators;
    +
    +    private GenericParser[] genericParsers;
    +
    +    private Iterator<Object[]> currentIterator;
    +
    +    private int counter;
    +
    +    private int batchSize;
    +
    +    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
    +        GenericParser[] genericParsers, int batchSize) {
    --- End diff --
    
    how do we ensure the passing `genericParsers` is comply to the `DataField[]` returned by `getOutput` of this class. (schema should match, and `genericParsers` array should be consistent with that schema)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---