You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/30 05:12:13 UTC

[05/13] incubator-carbondata git commit: Removed kettle related code and refactored

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
deleted file mode 100644
index 2b54812..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdatastep;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.pentaho.di.core.CheckResultInterface;
-import org.pentaho.di.core.Counter;
-import org.pentaho.di.core.database.DatabaseMeta;
-import org.pentaho.di.core.exception.KettleException;
-import org.pentaho.di.core.exception.KettleXMLException;
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.core.xml.XMLHandler;
-import org.pentaho.di.i18n.BaseMessages;
-import org.pentaho.di.repository.ObjectId;
-import org.pentaho.di.repository.Repository;
-import org.pentaho.di.trans.Trans;
-import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.BaseStepMeta;
-import org.pentaho.di.trans.step.StepDataInterface;
-import org.pentaho.di.trans.step.StepInterface;
-import org.pentaho.di.trans.step.StepMeta;
-import org.pentaho.di.trans.step.StepMetaInterface;
-import org.w3c.dom.Node;
-
-public class SortKeyStepMeta extends BaseStepMeta implements StepMetaInterface {
-  /**
-   * PKG
-   */
-  private static final Class<?> PKG = SortKeyStepMeta.class;
-
-  /**
-   * tabelName
-   */
-  private String tabelName;
-
-  /**
-   * outputRowSize
-   */
-  private String outputRowSize;
-
-  /**
-   * tableName
-   */
-  private String tableName;
-
-  /**
-   * databaseName
-   */
-  private String databaseName;
-
-  /**
-   * Dimension Count
-   */
-  private String dimensionCount;
-
-  /**
-   * ComplexTypes Count
-   */
-  private String complexDimensionCount;
-
-  /**
-   * Dimension Count
-   */
-  private int noDictionaryCount;
-
-  /**
-   * measureCount
-   */
-  private String measureCount;
-
-  private String factDimLensString;
-
-  /**
-   * isUpdateMemberRequest
-   */
-  private String updateMemberRequest;
-
-  private String measureDataType;
-
-  private String noDictionaryDims;
-  /**
-   * partitionID
-   */
-  private String partitionID;
-  /**
-   * Id of the load folder
-   */
-  private String segmentId;
-  /**
-   * task id, each spark task has a unique id
-   */
-  private String taskNo;
-  /**
-   * To determine the column whether is dictionary or not.
-   */
-  private String noDictionaryDimsMapping;
-
-  /**
-   * set the default value for all the properties
-   */
-  @Override public void setDefault() {
-    this.tabelName = "";
-    factDimLensString = "";
-    outputRowSize = "";
-    databaseName = "";
-    noDictionaryDims = "";
-    noDictionaryDimsMapping = "";
-    tableName = "";
-    dimensionCount = "";
-    complexDimensionCount = "";
-    measureCount = "";
-    updateMemberRequest = "";
-    measureDataType = "";
-    partitionID = "";
-    segmentId = "";
-    taskNo = "";
-  }
-
-  /**
-   * Get the XML that represents the values in this step
-   *
-   * @return the XML that represents the metadata in this step
-   * @throws KettleException in case there is a conversion or XML encoding error
-   */
-  public String getXML() {
-    StringBuilder retval = new StringBuilder(150);
-    retval.append("    ").append(XMLHandler.addTagValue("TableName", this.tabelName));
-    retval.append("    ").append(XMLHandler.addTagValue("factDimLensString", factDimLensString));
-    retval.append("    ").append(XMLHandler.addTagValue("outputRowSize", this.outputRowSize));
-    retval.append("    ").append(XMLHandler.addTagValue("tableName", this.tableName));
-    retval.append("    ").append(XMLHandler.addTagValue("databaseName", this.databaseName));
-    retval.append("    ").append(XMLHandler.addTagValue("dimensionCount", this.dimensionCount));
-    retval.append("    ").append(XMLHandler.addTagValue("noDictionaryDims", this.noDictionaryDims));
-    retval.append("    ")
-        .append(XMLHandler.addTagValue("noDictionaryDimsMapping", this.noDictionaryDimsMapping));
-    retval.append("    ")
-        .append(XMLHandler.addTagValue("complexDimensionCount", this.complexDimensionCount));
-    retval.append("    ").append(XMLHandler.addTagValue("measureCount", this.measureCount));
-    retval.append("    ")
-        .append(XMLHandler.addTagValue("isUpdateMemberRequest", this.updateMemberRequest));
-    retval.append("    ").append(XMLHandler.addTagValue("measureDataType", measureDataType));
-    retval.append("    ").append(XMLHandler.addTagValue("partitionID", partitionID));
-    retval.append("    ").append(XMLHandler.addTagValue("segmentId", segmentId));
-    retval.append("    ").append(XMLHandler.addTagValue("taskNo", taskNo));
-    return retval.toString();
-  }
-
-  /**
-   * Load the values for this step from an XML Node
-   *
-   * @param stepnode  the Node to get the info from
-   * @param databases The available list of databases to reference to
-   * @param counters  Counters to reference.
-   * @throws KettleXMLException When an unexpected XML error occurred. (malformed etc.)
-   */
-  public void loadXML(Node stepnode, List<DatabaseMeta> databases, Map<String, Counter> counters)
-      throws KettleXMLException {
-    try {
-      this.tabelName = XMLHandler.getTagValue(stepnode, "TableName");
-      this.outputRowSize = XMLHandler.getTagValue(stepnode, "outputRowSize");
-      this.factDimLensString = XMLHandler.getTagValue(stepnode, "factDimLensString");
-      this.tableName = XMLHandler.getTagValue(stepnode, "tableName");
-      this.databaseName = XMLHandler.getTagValue(stepnode, "databaseName");
-      this.dimensionCount = XMLHandler.getTagValue(stepnode, "dimensionCount");
-      this.noDictionaryDims = XMLHandler.getTagValue(stepnode, "noDictionaryDims");
-      this.noDictionaryDimsMapping = XMLHandler.getTagValue(stepnode, "noDictionaryDimsMapping");
-      this.complexDimensionCount = XMLHandler.getTagValue(stepnode, "complexDimensionCount");
-      this.measureCount = XMLHandler.getTagValue(stepnode, "measureCount");
-      this.updateMemberRequest = XMLHandler.getTagValue(stepnode, "isUpdateMemberRequest");
-      this.measureDataType = XMLHandler.getTagValue(stepnode, "measureDataType");
-      this.partitionID = XMLHandler.getTagValue(stepnode, "partitionID");
-      this.segmentId = XMLHandler.getTagValue(stepnode, "segmentId");
-      this.taskNo = XMLHandler.getTagValue(stepnode, "taskNo");
-    } catch (Exception e) {
-      throw new KettleXMLException("Unable to read step info from XML node", e);
-    }
-  }
-
-  /**
-   * Save the steps data into a Kettle repository
-   *
-   * @param rep              The Kettle repository to save to
-   * @param idTransformation The transformation ID
-   * @param idStep           The step ID
-   * @throws KettleException When an unexpected error occurred (database, network, etc)
-   */
-  public void saveRep(Repository rep, ObjectId idTransformation, ObjectId idStep)
-      throws KettleException {
-    try {
-      rep.saveStepAttribute(idTransformation, idStep, "TableName", this.tabelName);
-
-      rep.saveStepAttribute(idTransformation, idStep, "factDimLensString", factDimLensString);
-      rep.saveStepAttribute(idTransformation, idStep, "outputRowSize", this.outputRowSize);
-      rep.saveStepAttribute(idTransformation, idStep, "tableName", this.tableName);
-      rep.saveStepAttribute(idTransformation, idStep, "databaseName", this.databaseName);
-      rep.saveStepAttribute(idTransformation, idStep, "dimensionCount", this.dimensionCount);
-      rep.saveStepAttribute(idTransformation, idStep, "noDictionaryDims", this.noDictionaryDims);
-      rep.saveStepAttribute(idTransformation, idStep, "noDictionaryDimsMapping",
-          this.noDictionaryDimsMapping);
-      rep.saveStepAttribute(idTransformation, idStep, "complexDimensionCount",
-          this.complexDimensionCount);
-      rep.saveStepAttribute(idTransformation, idStep, "measureCount", this.measureCount);
-      rep.saveStepAttribute(idTransformation, idStep, "isUpdateMemberRequest",
-          this.updateMemberRequest);
-      rep.saveStepAttribute(idTransformation, idStep, "measureDataType", measureDataType);
-      rep.saveStepAttribute(idTransformation, idStep, "partitionID", partitionID);
-      rep.saveStepAttribute(idTransformation, idStep, "segmentId", segmentId);
-      rep.saveStepAttribute(idTransformation, idStep, "taskNo", taskNo);
-    } catch (Exception e) {
-      throw new KettleException(BaseMessages
-          .getString(PKG, "TemplateStep.Exception.UnableToSaveStepInfoToRepository", new String[0])
-          + idStep, e);
-    }
-  }
-
-  /**
-   * Read the steps information from a Kettle repository
-   *
-   * @param rep       The repository to read from
-   * @param idStep    The step ID
-   * @param databases The databases to reference
-   * @param counters  The counters to reference
-   * @throws KettleException When an unexpected error occurred (database, network, etc)
-   */
-  public void readRep(Repository rep, ObjectId idStep, List<DatabaseMeta> databases,
-      Map<String, Counter> counters) throws KettleException {
-    try {
-      this.tabelName = rep.getStepAttributeString(idStep, "TableName");
-      this.outputRowSize = rep.getStepAttributeString(idStep, "outputRowSize");
-      this.databaseName = rep.getStepAttributeString(idStep, "databaseName");
-      this.tableName = rep.getStepAttributeString(idStep, "tableName");
-      this.dimensionCount = rep.getStepAttributeString(idStep, "dimensionCount");
-      this.noDictionaryDims = rep.getStepAttributeString(idStep, "noDictionaryDims");
-      this.noDictionaryDims = rep.getStepAttributeString(idStep, "noDictionaryDimsMapping");
-      this.complexDimensionCount = rep.getStepAttributeString(idStep, "complexDimensionCount");
-      this.measureCount = rep.getStepAttributeString(idStep, "measureCount");
-      this.updateMemberRequest = rep.getStepAttributeString(idStep, "isUpdateMemberRequest");
-      this.measureDataType = rep.getStepAttributeString(idStep, "measureDataType");
-      this.partitionID = rep.getStepAttributeString(idStep, "partitionID");
-      this.segmentId = rep.getStepAttributeString(idStep, "segmentId");
-      this.taskNo = rep.getStepAttributeString(idStep, "taskNo");
-    } catch (Exception ex) {
-      throw new KettleException(BaseMessages
-          .getString(PKG, "CarbonDataWriterStepMeta.Exception.UnexpectedErrorInReadingStepInfo",
-              new String[0]), ex);
-    }
-  }
-
-  /**
-   * Checks the settings of this step and puts the findings in a remarks List.
-   *
-   * @param remarks  The list to put the remarks in @see
-   *                 org.pentaho.di.core.CheckResult
-   * @param stepMeta The stepMeta to help checking
-   * @param prev     The fields coming from the previous step
-   * @param input    The input step names
-   * @param output   The output step names
-   * @param info     The fields that are used as information by the step
-   */
-  public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta,
-      RowMetaInterface prev, String[] input, String[] output, RowMetaInterface info) {
-    CarbonDataProcessorUtil.checkResult(remarks, stepMeta, input);
-  }
-
-  /**
-   * Get the executing step, needed by Trans to launch a step.
-   *
-   * @param stepMeta          The step info
-   * @param stepDataInterface the step data interface linked to this step. Here the step can
-   *                          store temporary data, database connections, etc.
-   * @param copyNr            The copy nr to get
-   * @param transMeta         The transformation info
-   * @param trans             The launching transformation
-   */
-  public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr,
-      TransMeta transMeta, Trans trans) {
-    return new SortKeyStep(stepMeta, stepDataInterface, copyNr, transMeta, trans);
-  }
-
-  /**
-   * Get a new instance of the appropriate data class. This data class
-   * implements the StepDataInterface. It basically contains the persisting
-   * data that needs to live on, even if a worker thread is terminated.
-   *
-   * @return The appropriate StepDataInterface class.
-   */
-  public StepDataInterface getStepData() {
-    return new SortKeyStepData();
-  }
-
-  /**
-   * Below method will be used to get the out row size
-   *
-   * @return outputRowSize
-   */
-  public String getOutputRowSize() {
-    return outputRowSize;
-  }
-
-  /**
-   * below mthod will be used to set the out row size
-   *
-   * @param outputRowSize
-   */
-  public void setOutputRowSize(String outputRowSize) {
-    this.outputRowSize = outputRowSize;
-  }
-
-  /**
-   * This method will return the table name
-   *
-   * @return tabelName
-   */
-
-  public String getTabelName() {
-    return this.tabelName;
-  }
-
-  /**
-   * This method will set the table name
-   *
-   * @param tabelName
-   */
-  public void setTabelName(String tabelName) {
-    this.tabelName = tabelName;
-  }
-
-  /**
-   * @param tableName the tableName to set
-   */
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  /**
-   * @return the databaseName
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * @param databaseName the databaseName to set
-   */
-  public void setDatabaseName(String databaseName) {
-    this.databaseName = databaseName;
-  }
-
-  /**
-   * @return the dimensionCount
-   */
-  public int getDimensionCount() {
-    return Integer.parseInt(dimensionCount);
-  }
-
-  public void setDimensionCount(String dimensionCount) {
-    this.dimensionCount = dimensionCount;
-  }
-
-  /**
-   * @return the complexDimensionCount
-   */
-  public int getComplexDimensionCount() {
-    return Integer.parseInt(complexDimensionCount);
-  }
-
-  public void setComplexDimensionCount(String complexDimensionCount) {
-    this.complexDimensionCount = complexDimensionCount;
-  }
-
-  /**
-   * @return the measureCount
-   */
-  public int getMeasureCount() {
-    return Integer.parseInt(measureCount);
-  }
-
-  /**
-   * @param measureCount the measureCount to set
-   */
-  public void setMeasureCount(String measureCount) {
-    this.measureCount = measureCount;
-  }
-
-  /**
-   * @param isUpdateMemberRequest the isUpdateMemberRequest to set
-   */
-  public void setIsUpdateMemberRequest(String isUpdateMemberRequest) {
-    this.updateMemberRequest = isUpdateMemberRequest;
-  }
-
-  public void setMeasureDataType(String measureDataType) {
-    this.measureDataType = measureDataType;
-  }
-
-  public String getNoDictionaryDims() {
-    return noDictionaryDims;
-  }
-
-  public void setNoDictionaryDims(String noDictionaryDims) {
-    this.noDictionaryDims = noDictionaryDims;
-  }
-
-  /**
-   * @return the noDictionaryCount
-   */
-  public int getNoDictionaryCount() {
-    return noDictionaryCount;
-  }
-
-  /**
-   * @param noDictionaryCount the noDictionaryCount to set
-   */
-  public void setNoDictionaryCount(int noDictionaryCount) {
-    this.noDictionaryCount = noDictionaryCount;
-  }
-
-  /**
-   * @return partitionId
-   */
-  public String getPartitionID() {
-    return partitionID;
-  }
-
-  /**
-   * @param partitionID
-   */
-  public void setPartitionID(String partitionID) {
-    this.partitionID = partitionID;
-  }
-
-  /**
-   * return segmentId
-   *
-   * @return
-   */
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  /**
-   * set segment Id
-   *
-   * @param segmentId
-   */
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
-  /**
-   * @param taskNo
-   */
-  public void setTaskNo(String taskNo) {
-    this.taskNo = taskNo;
-  }
-
-  /**
-   * @return
-   */
-  public String getTaskNo() {
-    return taskNo;
-  }
-
-  public String getNoDictionaryDimsMapping() {
-    return noDictionaryDimsMapping;
-  }
-
-  public void setNoDictionaryDimsMapping(String noDictionaryDimsMapping) {
-    this.noDictionaryDimsMapping = noDictionaryDimsMapping;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 3806c55..2affa03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -65,13 +65,13 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.mdkeygen.file.FileManager;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
 import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
 import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
 import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
 import org.apache.carbondata.processing.store.colgroup.DataHolder;
+import org.apache.carbondata.processing.store.file.FileManager;
+import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
@@ -256,8 +256,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private boolean compactionFlow;
 
-  private boolean useKettle;
-
   private int bucketNumber;
 
   private long schemaUpdatedTimeStamp;
@@ -279,7 +277,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dimensionCount = carbonFactDataHandlerModel.getDimensionCount();
     this.complexIndexMap = carbonFactDataHandlerModel.getComplexIndexMap();
     this.primitiveDimLens = carbonFactDataHandlerModel.getPrimitiveDimLens();
-    this.useKettle = carbonFactDataHandlerModel.isUseKettle();
     this.isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
             CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
@@ -481,144 +478,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
   }
 
-  // TODO remove after kettle flow is removed
-  private NodeHolder processDataRows(List<Object[]> dataRows) throws CarbonDataWriterException {
-    Object[] max = new Object[measureCount];
-    Object[] min = new Object[measureCount];
-    int[] decimal = new int[measureCount];
-    Object[] uniqueValue = new Object[measureCount];
-    // to store index of the measure columns which are null
-    BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
-    for (int i = 0; i < max.length; i++) {
-      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        max[i] = Long.MIN_VALUE;
-      } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
-        max[i] = -Double.MAX_VALUE;
-      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        max[i] = new BigDecimal(-Double.MAX_VALUE);
-      } else {
-        max[i] = 0.0;
-      }
-    }
-    for (int i = 0; i < min.length; i++) {
-      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        min[i] = Long.MAX_VALUE;
-        uniqueValue[i] = Long.MIN_VALUE;
-      } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
-        min[i] = Double.MAX_VALUE;
-        uniqueValue[i] = Double.MIN_VALUE;
-      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        min[i] = new BigDecimal(Double.MAX_VALUE);
-        uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
-      } else {
-        min[i] = 0.0;
-        uniqueValue[i] = 0.0;
-      }
-    }
-    for (int i = 0; i < decimal.length; i++) {
-      decimal[i] = 0;
-    }
-
-    byte[] startKey = null;
-    byte[] endKey = null;
-    byte[] noDictStartKey = null;
-    byte[] noDictEndKey = null;
-    CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size());
-    CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolder(dataRows.size());
-    CarbonWriteDataHolder noDictionaryKeyDataHolder = null;
-    if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryKeyDataHolder = initialiseKeyBlockHolder(dataRows.size());
-    }
-
-    for (int count = 0; count < dataRows.size(); count++) {
-      Object[] row = dataRows.get(count);
-      byte[] mdKey = (byte[]) row[this.mdKeyIndex];
-      byte[] noDictionaryKey = null;
-      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-        noDictionaryKey = (byte[]) row[this.mdKeyIndex - 1];
-      }
-      ByteBuffer byteBuffer = null;
-      byte[] b = null;
-      if (count == 0) {
-        startKey = mdKey;
-        noDictStartKey = noDictionaryKey;
-      }
-      endKey = mdKey;
-      noDictEndKey = noDictionaryKey;
-      // add to key store
-      if (mdKey.length > 0) {
-        keyDataHolder.setWritableByteArrayValueByIndex(count, mdKey);
-      }
-      // for storing the byte [] for high card.
-      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-        noDictionaryKeyDataHolder.setWritableByteArrayValueByIndex(count, noDictionaryKey);
-      }
-      //Add all columns to keyDataHolder
-      keyDataHolder.setWritableByteArrayValueByIndex(count, this.mdKeyIndex, row);
-      // CHECKSTYLE:OFF Approval No:Approval-351
-      for (int k = 0; k < otherMeasureIndex.length; k++) {
-        if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          if (null == row[otherMeasureIndex[k]]) {
-            nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-            dataHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
-          } else {
-            dataHolder[otherMeasureIndex[k]]
-                .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
-          }
-        } else {
-          if (null == row[otherMeasureIndex[k]]) {
-            nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-            dataHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
-          } else {
-            dataHolder[otherMeasureIndex[k]]
-                .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
-          }
-        }
-      }
-      calculateMaxMin(max, min, decimal, otherMeasureIndex, row);
-      for (int i = 0; i < customMeasureIndex.length; i++) {
-        if (null == row[customMeasureIndex[i]]
-            && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          BigDecimal val = BigDecimal.valueOf(0);
-          b = DataTypeUtil.bigDecimalToByte(val);
-          nullValueIndexBitSet[customMeasureIndex[i]].set(count);
-        } else {
-          if (this.compactionFlow) {
-            BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
-            b = DataTypeUtil.bigDecimalToByte(bigDecimal);
-          } else {
-            b = (byte[]) row[customMeasureIndex[i]];
-          }
-        }
-        byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
-        byteBuffer.putInt(b.length);
-        byteBuffer.put(b);
-        byteBuffer.flip();
-        b = byteBuffer.array();
-        dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
-      }
-      calculateMaxMin(max, min, decimal, customMeasureIndex, row);
-    }
-    calculateUniqueValue(min, uniqueValue);
-    byte[][] byteArrayValues = keyDataHolder.getByteArrayValues().clone();
-    byte[][] noDictionaryValueHolder = null;
-    if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryValueHolder = noDictionaryKeyDataHolder.getByteArrayValues();
-    }
-    WriterCompressModel compressionModel = ValueCompressionUtil
-        .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[max.length]);
-    byte[][] writableMeasureDataArray =
-        StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder)
-            .clone();
-    NodeHolder nodeHolder =
-        getNodeHolderObject(writableMeasureDataArray, byteArrayValues, dataRows.size(), startKey,
-            endKey, compressionModel, noDictionaryValueHolder, noDictStartKey, noDictEndKey,
-            nullValueIndexBitSet);
-    LOGGER.info("Number Of records processed: " + dataRows.size());
-    return nodeHolder;
-  }
-
-  private NodeHolder processDataRowsWithOutKettle(List<Object[]> dataRows)
+  private NodeHolder processDataRows(List<Object[]> dataRows)
       throws CarbonDataWriterException {
     Object[] max = new Object[measureCount];
     Object[] min = new Object[measureCount];
@@ -661,7 +521,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     byte[][] noDictStartKey = null;
     byte[][] noDictEndKey = null;
     CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size());
-    CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolderWithOutKettle(dataRows.size());
+    CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolder(dataRows.size());
     CarbonWriteDataHolder noDictionaryKeyDataHolder = null;
     if ((noDictionaryCount + complexColCount) > 0) {
       noDictionaryKeyDataHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
@@ -756,142 +616,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder)
             .clone();
     NodeHolder nodeHolder =
-        getNodeHolderObjectWithOutKettle(writableMeasureDataArray, byteArrayValues, dataRows.size(),
+        getNodeHolderObject(writableMeasureDataArray, byteArrayValues, dataRows.size(),
             startKey, endKey, compressionModel, noDictionaryValueHolder, noDictStartKey,
             noDictEndKey, nullValueIndexBitSet);
     LOGGER.info("Number Of records processed: " + dataRows.size());
     return nodeHolder;
   }
 
-  // TODO remove after kettle flow is removed
-  private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues,
-      int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
-      WriterCompressModel compressionModel, byte[][] noDictionaryData, byte[] noDictionaryStartKey,
-      byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException {
-    byte[][][] noDictionaryColumnsData = null;
-    List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
-    int complexColCount = getComplexColsCount();
-
-    for (int i = 0; i < complexColCount; i++) {
-      colsAndValues.add(new ArrayList<byte[]>());
-    }
-    int noOfColumn = colGrpModel.getNoOfColumnStore();
-    DataHolder[] dataHolders = getDataHolders(noOfColumn, byteArrayValues.length);
-    for (int i = 0; i < byteArrayValues.length; i++) {
-      byte[][] splitKey = columnarSplitter.splitKey(byteArrayValues[i]);
-
-      for (int j = 0; j < splitKey.length; j++) {
-        dataHolders[j].addData(splitKey[j], i);
-      }
-    }
-    if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-      noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryData.length][];
-      for (int i = 0; i < noDictionaryData.length; i++) {
-        int complexColumnIndex = primitiveDimLens.length + noDictionaryCount;
-        byte[][] splitKey = NonDictionaryUtil
-            .splitNoDictionaryKey(noDictionaryData[i], noDictionaryCount + complexIndexMap.size());
-
-        int complexTypeIndex = 0;
-        for (int j = 0; j < splitKey.length; j++) {
-          //nodictionary Columns
-          if (j < noDictionaryCount) {
-            noDictionaryColumnsData[j][i] = splitKey[j];
-          }
-          //complex types
-          else {
-            // Need to write columnar block from complex byte array
-            int index = complexColumnIndex - noDictionaryCount;
-            GenericDataType complexDataType = complexIndexMap.get(index);
-            complexColumnIndex++;
-            if (complexDataType != null) {
-              List<ArrayList<byte[]>> columnsArray = new ArrayList<ArrayList<byte[]>>();
-              for (int k = 0; k < complexDataType.getColsCount(); k++) {
-                columnsArray.add(new ArrayList<byte[]>());
-              }
-
-              try {
-                ByteBuffer complexDataWithoutBitPacking = ByteBuffer.wrap(splitKey[j]);
-                byte[] complexTypeData = new byte[complexDataWithoutBitPacking.getShort()];
-                complexDataWithoutBitPacking.get(complexTypeData);
-
-                ByteBuffer byteArrayInput = ByteBuffer.wrap(complexTypeData);
-                ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
-                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
-                complexDataType
-                    .parseAndBitPack(byteArrayInput, dataOutputStream, this.complexKeyGenerator);
-                complexDataType.getColumnarDataForComplexType(columnsArray,
-                    ByteBuffer.wrap(byteArrayOutput.toByteArray()));
-                byteArrayOutput.close();
-              } catch (IOException e) {
-                throw new CarbonDataWriterException(
-                    "Problem while bit packing and writing complex datatype", e);
-              } catch (KeyGenException e) {
-                throw new CarbonDataWriterException(
-                    "Problem while bit packing and writing complex datatype", e);
-              }
-
-              for (ArrayList<byte[]> eachColumn : columnsArray) {
-                colsAndValues.get(complexTypeIndex++).addAll(eachColumn);
-              }
-            } else {
-              // This case not possible as ComplexType is the last columns
-            }
-          }
-        }
-      }
-    }
-    thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
-            CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
-    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
-    List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
-        primitiveDimLens.length + noDictionaryCount + complexColCount);
-    int i = 0;
-    int dictionaryColumnCount = -1;
-    int noDictionaryColumnCount = -1;
-    for (i = 0; i < dimensionType.length; i++) {
-      if (dimensionType[i]) {
-        dictionaryColumnCount++;
-        if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-          submit.add(executorService.submit(
-              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
-                  isUseInvertedIndex[i])));
-        } else {
-          submit.add(
-              executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
-        }
-      } else {
-        submit.add(executorService.submit(
-            new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true,
-                true, isUseInvertedIndex[i])));
-      }
-    }
-    for (int k = 0; k < complexColCount; k++) {
-      submit.add(executorService.submit(new BlockSortThread(i++,
-          colsAndValues.get(k).toArray(new byte[colsAndValues.get(k).size()][]), false, true)));
-    }
-    executorService.shutdown();
-    try {
-      executorService.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    IndexStorage[] blockStorage =
-        new IndexStorage[colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount];
-    try {
-      for (int k = 0; k < blockStorage.length; k++) {
-        blockStorage[k] = submit.get(k).get();
-      }
-    } catch (Exception e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return this.dataWriter
-        .buildDataNodeHolder(blockStorage, dataHolderLocal, entryCountLocal, startkeyLocal,
-            endKeyLocal, compressionModel, noDictionaryStartKey, noDictionaryEndKey,
-            nullValueIndexBitSet);
-  }
-
-  private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal,
+  private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal,
       byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
       WriterCompressModel compressionModel, byte[][][] noDictionaryData,
       byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
@@ -1343,16 +1075,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return blockKeySizeWithComplexTypes;
   }
 
-  // TODO Remove after kettle flow got removed.
   private CarbonWriteDataHolder initialiseKeyBlockHolder(int size) {
     CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
-    keyDataHolder.initialiseByteArrayValues(size);
-    return keyDataHolder;
-  }
-
-  private CarbonWriteDataHolder initialiseKeyBlockHolderWithOutKettle(int size) {
-    CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
-    keyDataHolder.initialiseByteArrayValuesWithOutKettle(size);
+    keyDataHolder.initialiseByteArrayValuesForKey(size);
     return keyDataHolder;
   }
 
@@ -1556,12 +1281,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       try {
-        NodeHolder nodeHolder;
-        if (useKettle) {
-          nodeHolder = processDataRows(dataRows);
-        } else {
-          nodeHolder = processDataRowsWithOutKettle(dataRows);
-        }
+        NodeHolder nodeHolder = processDataRows(dataRows);
         // insert the object in array according to sequence number
         int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
         blockletDataHolder.put(nodeHolder, indexInNodeHolderArray);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 59f2eb3..ffd23a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -171,11 +171,6 @@ public class CarbonFactDataHandlerModel {
    */
   private boolean isCompactionFlow;
 
-  /**
-   * To use kettle flow to load or not.
-   */
-  private boolean useKettle = true;
-
   private int bucketId = 0;
 
   private String segmentId;
@@ -287,7 +282,6 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndex);
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
-    carbonFactDataHandlerModel.setUseKettle(false);
     if (noDictionaryCount > 0 || complexDimensionCount > 0) {
       carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
     } else {
@@ -500,14 +494,6 @@ public class CarbonFactDataHandlerModel {
     this.wrapperColumnSchema = wrapperColumnSchema;
   }
 
-  public boolean isUseKettle() {
-    return useKettle;
-  }
-
-  public void setUseKettle(boolean useKettle) {
-    this.useKettle = useKettle;
-  }
-
   public int getBucketId() {
     return bucketId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 8c94328..68f9bd5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -95,8 +95,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
   private char[] aggType;
 
-  private boolean useKettle;
-
   /**
    * below code is to check whether dimension
    * is of no dictionary type or not
@@ -105,7 +103,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
   public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName,
       int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
-      char[] aggType, boolean[] isNoDictionaryColumn, boolean useKettle) {
+      char[] aggType, boolean[] isNoDictionaryColumn) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.dimensionCount = dimensionCount;
@@ -114,7 +112,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     this.aggType = aggType;
     this.noDictionaryCount = noDictionaryCount;
     this.isNoDictionaryColumn = isNoDictionaryColumn;
-    this.useKettle = useKettle;
   }
 
   /**
@@ -183,8 +180,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
           // create chunk holder
           SortTempFileChunkHolder sortTempFileChunkHolder =
               new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn,
-                  useKettle);
+                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn);
 
           // initialize
           sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
new file mode 100644
index 0000000..ddd9bf2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.file;
+
+
+public class FileData extends FileManager {
+
+  /**
+   * Store Path
+   */
+  private String storePath;
+
+  /**
+   * hierarchyValueWriter
+   */
+
+  public FileData(String fileName, String storePath) {
+    this.fileName = fileName;
+    this.storePath = storePath;
+  }
+
+  /**
+   * @return Returns the carbonDataFileTempPath.
+   */
+  public String getFileName() {
+    return fileName;
+  }
+
+  /**
+   * @return Returns the storePath.
+   */
+  public String getStorePath() {
+    return storePath;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
new file mode 100644
index 0000000..cfa3a66
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.file;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public class FileManager implements IFileManagerComposite {
+  /**
+   * listOfFileData, composite parent which holds the different objects
+   */
+  protected List<IFileManagerComposite> listOfFileData =
+      new ArrayList<IFileManagerComposite>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+  protected String fileName;
+
+  @Override public void add(IFileManagerComposite customData) {
+    listOfFileData.add(customData);
+  }
+
+  @Override public void remove(IFileManagerComposite customData) {
+    listOfFileData.remove(customData);
+
+  }
+
+  @Override public IFileManagerComposite get(int i) {
+    return listOfFileData.get(i);
+  }
+
+  @Override public void setName(String name) {
+    this.fileName = name;
+  }
+
+  /**
+   * Return the size
+   */
+  public int size() {
+    return listOfFileData.size();
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
new file mode 100644
index 0000000..6691772
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.file;
+
+public interface IFileManagerComposite {
+  /**
+   * Add the data which can be either row Folder(Composite) or File
+   *
+   * @param customData
+   */
+  void add(IFileManagerComposite customData);
+
+  /**
+   * Remove the CustomData type object from the IFileManagerComposite object hierarchy.
+   *
+   * @param customData
+   */
+  void remove(IFileManagerComposite customData);
+
+  /**
+   * get the CustomData type object name
+   *
+   * @return CustomDataIntf type
+   */
+  IFileManagerComposite get(int i);
+
+  /**
+   * set the CustomData type object name
+   *
+   * @param name
+   */
+  void setName(String name);
+
+  /**
+   * Get the size
+   *
+   * @return
+   */
+  int size();
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a73b356..cda907c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -66,7 +66,7 @@ import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
-import org.apache.carbondata.processing.mdkeygen.file.FileData;
+import org.apache.carbondata.processing.store.file.FileData;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 import org.apache.commons.lang3.ArrayUtils;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 9ed0baa..ce53ec8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -20,8 +20,8 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 
 /**
  * Value object for writing the data

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
deleted file mode 100644
index 86b63df..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.writer.HierarchyValueWriterForCSV;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
-import org.apache.carbondata.processing.schema.metadata.ArrayWrapper;
-import org.apache.carbondata.processing.schema.metadata.ColumnSchemaDetails;
-import org.apache.carbondata.processing.schema.metadata.ColumnsInfo;
-
-import org.pentaho.di.core.exception.KettleException;
-
-public abstract class CarbonCSVBasedDimSurrogateKeyGen {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonCSVBasedDimSurrogateKeyGen.class.getName());
-  /**
-   * max
-   */
-  protected int[] max;
-  /**
-   * connection
-   */
-  protected Connection connection;
-  /**
-   * hierInsertFileNames
-   */
-  protected Map<String, String> hierInsertFileNames;
-  /**
-   * dimInsertFileNames
-   */
-  protected String[] dimInsertFileNames;
-  /**
-   * columnsInfo
-   */
-  protected ColumnsInfo columnsInfo;
-  /**
-   * primary key max surrogate key map
-   */
-  protected Map<String, Integer> primaryKeysMaxSurroagetMap;
-  /**
-   * Measure max surrogate key map
-   */
-  protected Map<String, Integer> measureMaxSurroagetMap;
-  /**
-   * File manager
-   */
-  protected IFileManagerComposite fileManager;
-
-  /**
-   * Cache should be map only. because, multiple levels can map to same
-   * database column. This case duplicate storage should be avoided.
-   */
-  private Map<String, Dictionary> dictionaryCaches;
-  /**
-   * Year Cache
-   */
-  private Map<String, Map<String, Integer>> timeDimCache;
-  /**
-   * dimsFiles
-   */
-  private String[] dimsFiles;
-  /**
-   * timeDimMax
-   */
-  private int[] timDimMax;
-  /**
-   * hierCache
-   */
-  private Map<String, Map<Integer, int[]>> hierCache =
-      new HashMap<String, Map<Integer, int[]>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  /**
-   *
-   */
-  private Map<String, Map<ArrayWrapper, Integer>> hierCacheReverse =
-      new HashMap<String, Map<ArrayWrapper, Integer>>(
-          CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  /**
-   * dimension ordinal to dimension mapping
-   */
-  private CarbonDimension[] dimensionOrdinalToDimensionMapping;
-  /**
-   * rwLock2
-   */
-  private ReentrantReadWriteLock rwLock2 = new ReentrantReadWriteLock();
-  /**
-   * wLock2
-   */
-  protected Lock wLock2 = rwLock2.writeLock();
-  /**
-   * Store Folder Name with Load number.
-   */
-  private String storeFolderWithLoadNumber;
-
-  /**
-   * @param columnsInfo ColumnsInfo With all the required details for surrogate key generation and
-   *                    hierarchy entries.
-   */
-  public CarbonCSVBasedDimSurrogateKeyGen(ColumnsInfo columnsInfo) {
-    this.columnsInfo = columnsInfo;
-
-    setDimensionTables(columnsInfo.getDimColNames());
-    setHierFileNames(columnsInfo.getHierTables());
-  }
-
-  /**
-   * @param tuple         The string value whose surrogate key will be gennerated.
-   * @param tabColumnName The K of dictionaryCaches Map, for example "tablename_columnname"
-   */
-  public Integer generateSurrogateKeys(String tuple, String tabColumnName) throws KettleException {
-    Integer key = null;
-    Dictionary dicCache = dictionaryCaches.get(tabColumnName);
-    key = dicCache.getSurrogateKey(tuple);
-    return key;
-  }
-
-  /**
-   * @param tuple         The string value whose surrogate key will be gennerated.
-   * @param tabColumnName The K of dictionaryCaches Map, for example "tablename_columnname"
-   */
-  public Integer generateSurrogateKeys(String tuple, String tabColumnName, String columnId)
-      throws KettleException {
-    Integer key = null;
-    Dictionary dicCache = dictionaryCaches.get(tabColumnName);
-    if (null == dicCache) {
-      ColumnSchemaDetails columnSchemaDetails =
-          this.columnsInfo.getColumnSchemaDetailsWrapper().get(columnId);
-      if (columnSchemaDetails.isDirectDictionary()) {
-        DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(columnSchemaDetails.getColumnType());
-        key = directDictionaryGenerator.generateDirectSurrogateKey(tuple);
-      }
-    } else {
-      key = dicCache.getSurrogateKey(tuple);
-    }
-    return key;
-  }
-
-
-  public Integer generateSurrogateKeysForTimeDims(String tuple, String columnName, int index,
-      Object[] props) throws KettleException {
-    Integer key = null;
-    Dictionary dicCache = dictionaryCaches.get(columnName);
-    key = dicCache.getSurrogateKey(tuple);
-    if (key == null) {
-      if (timDimMax[index] >= columnsInfo.getMaxKeys()[index]) {
-        if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(tuple)) {
-          tuple = null;
-        }
-        LOGGER.error("Invalid cardinality. Key size exceeded cardinality for: " + columnsInfo
-                .getDimColNames()[index] + ": MemberValue: " + tuple);
-        return -1;
-      }
-      timDimMax[index]++;
-      Map<String, Integer> timeCache = timeDimCache.get(columnName);
-      // Extract properties from tuple
-      // Need to create a new surrogate key.
-      key = getSurrogateFromStore(tuple, index, props);
-      if (null != timeCache) {
-        timeCache.put(tuple, key);
-      }
-    } else {
-      return updateSurrogateToStore(tuple, columnName, index, key, props);
-    }
-    return key;
-  }
-
-  public void checkNormalizedHierExists(int[] val, String hier,
-      HierarchyValueWriterForCSV hierWriter) throws KettleException {
-    Map<ArrayWrapper, Integer> cache = hierCacheReverse.get(hier);
-
-    ArrayWrapper wrapper = new ArrayWrapper(val);
-    Integer hCache = cache.get(wrapper);
-    if (hCache != null) {
-      return;
-    } else {
-      wLock2.lock();
-      try {
-        getNormalizedHierFromStore(val, hier, 1, hierWriter);
-        // Store in cache
-        cache.put(wrapper, 1);
-      } finally {
-        wLock2.unlock();
-      }
-    }
-  }
-
-  public void close() throws Exception {
-    if (null != connection) {
-      connection.close();
-    }
-  }
-
-  /**
-   * Search entry and insert if not found in store.
-   *
-   * @param val
-   * @param hier
-   * @return
-   * @throws KeyGenException
-   * @throws KettleException
-   */
-  protected abstract byte[] getNormalizedHierFromStore(int[] val, String hier, int primaryKey,
-      HierarchyValueWriterForCSV hierWriter) throws KettleException;
-
-  /**
-   * Search entry and insert if not found in store.
-   *
-   * @param value
-   * @param index
-   * @param properties - Ordinal column, name column and all other properties
-   * @return
-   * @throws KettleException
-   */
-  protected abstract int getSurrogateFromStore(String value, int index, Object[] properties)
-      throws KettleException;
-
-  /**
-   * Search entry and insert if not found in store.
-   *
-   * @param value
-   * @param columnName
-   * @param index
-   * @param properties - Ordinal column, name column and all other properties
-   * @return
-   * @throws KettleException
-   */
-  protected abstract int updateSurrogateToStore(String value, String columnName, int index, int key,
-      Object[] properties) throws KettleException;
-
-  /**
-   * generate the surroagate key for the measure values.
-   *
-   * @return
-   * @throws KettleException
-   */
-  public abstract int getSurrogateForMeasure(String tuple, String columnName)
-      throws KettleException;
-
-  private Map<Integer, int[]> getHCache(String hName) {
-    Map<Integer, int[]> hCache = hierCache.get(hName);
-    if (hCache == null) {
-      hCache = new HashMap<Integer, int[]>();
-      hierCache.put(hName, hCache);
-    }
-
-    return hCache;
-  }
-
-  private Map<ArrayWrapper, Integer> getHCacheReverse(String hName) {
-    Map<ArrayWrapper, Integer> hCache = hierCacheReverse.get(hName);
-    if (hCache == null) {
-      hCache = new HashMap<ArrayWrapper, Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-      hierCacheReverse.put(hName, hCache);
-    }
-
-    return hCache;
-  }
-
-  private void setHierFileNames(Set<String> set) {
-    hierInsertFileNames =
-        new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (String s : set) {
-      hierInsertFileNames.put(s, s + CarbonCommonConstants.HIERARCHY_FILE_EXTENSION);
-
-      // fix hierStream is null issue
-      getHCache(s);
-      getHCacheReverse(s);
-    }
-  }
-
-  private void setDimensionTables(String[] dimeFileNames) {
-    int noOfPrimitiveDims = 0;
-    List<String> dimFilesForPrimitives = new ArrayList<String>();
-    List<Boolean> isDirectDictionary = new ArrayList<Boolean>();
-    dictionaryCaches = new ConcurrentHashMap<String, Dictionary>();
-    for (int i = 0; i < dimeFileNames.length; i++) {
-      GenericDataType complexType = columnsInfo.getComplexTypesMap()
-          .get(dimeFileNames[i].substring(columnsInfo.getTableName().length() + 1));
-      if (complexType != null) {
-        List<GenericDataType> primitiveChild = new ArrayList<GenericDataType>();
-        complexType.getAllPrimitiveChildren(primitiveChild);
-        for (GenericDataType eachPrimitive : primitiveChild) {
-          dimFilesForPrimitives.add(
-              columnsInfo.getTableName() + CarbonCommonConstants.UNDERSCORE + eachPrimitive
-                  .getName());
-          eachPrimitive.setSurrogateIndex(noOfPrimitiveDims);
-          noOfPrimitiveDims++;
-          ColumnSchemaDetails columnSchemaDetails =
-              columnsInfo.getColumnSchemaDetailsWrapper().get(eachPrimitive.getColumnId());
-          if (columnSchemaDetails.isDirectDictionary()) {
-            isDirectDictionary.add(true);
-          } else {
-            isDirectDictionary.add(false);
-          }
-        }
-      } else {
-        dimFilesForPrimitives.add(dimeFileNames[i]);
-        noOfPrimitiveDims++;
-        isDirectDictionary.add(false);
-      }
-    }
-    max = new int[noOfPrimitiveDims];
-    for (int i = 0; i < isDirectDictionary.size(); i++) {
-      if (isDirectDictionary.get(i)) {
-        max[i] = Integer.MAX_VALUE;
-      }
-    }
-    this.dimsFiles = dimFilesForPrimitives.toArray(new String[dimFilesForPrimitives.size()]);
-
-    createRespectiveDimFilesForDimTables();
-  }
-
-  private void createRespectiveDimFilesForDimTables() {
-    int dimCount = this.dimsFiles.length;
-    dimInsertFileNames = new String[dimCount];
-    System.arraycopy(dimsFiles, 0, dimInsertFileNames, 0, dimCount);
-  }
-
-  /**
-   * isCacheFilled
-   *
-   * @param columnNames
-   * @return boolean
-   */
-  public abstract boolean isCacheFilled(String[] columnNames);
-
-  /**
-   * @return Returns the storeFolderWithLoadNumber.
-   */
-  public String getStoreFolderWithLoadNumber() {
-    return storeFolderWithLoadNumber;
-  }
-
-  /**
-   * @param storeFolderWithLoadNumber The storeFolderWithLoadNumber to set.
-   */
-  public void setStoreFolderWithLoadNumber(String storeFolderWithLoadNumber) {
-    this.storeFolderWithLoadNumber = storeFolderWithLoadNumber;
-  }
-
-  /**
-   * @return Returns the dictionaryCaches.
-   */
-  public Map<String, Dictionary> getDictionaryCaches() {
-    return dictionaryCaches;
-  }
-
-  /**
-   * @param dictionaryCaches The dictionaryCaches to set.
-   */
-  public void setDictionaryCaches(Map<String, Dictionary> dictionaryCaches) {
-    this.dictionaryCaches = dictionaryCaches;
-  }
-
-  /**
-   * @return Returns the timeDimCache.
-   */
-  public Map<String, Map<String, Integer>> getTimeDimCache() {
-    return timeDimCache;
-  }
-
-  /**
-   * @param timeDimCache The timeDimCache to set.
-   */
-  public void setTimeDimCache(Map<String, Map<String, Integer>> timeDimCache) {
-    this.timeDimCache = timeDimCache;
-  }
-
-  /**
-   * @return Returns the dimsFiles.
-   */
-  public String[] getDimsFiles() {
-    return dimsFiles;
-  }
-
-  /**
-   * @param dimsFiles The dimsFiles to set.
-   */
-  public void setDimsFiles(String[] dimsFiles) {
-    this.dimsFiles = dimsFiles;
-  }
-
-  /**
-   * @return Returns the hierCache.
-   */
-  public Map<String, Map<Integer, int[]>> getHierCache() {
-    return hierCache;
-  }
-
-  /**
-   * @param hierCache The hierCache to set.
-   */
-  public void setHierCache(Map<String, Map<Integer, int[]>> hierCache) {
-    this.hierCache = hierCache;
-  }
-
-  /**
-   * @return Returns the timDimMax.
-   */
-  public int[] getTimDimMax() {
-    return timDimMax;
-  }
-
-  /**
-   * @param timDimMax The timDimMax to set.
-   */
-  public void setTimDimMax(int[] timDimMax) {
-    this.timDimMax = timDimMax;
-  }
-
-  /**
-   * @return the hierCacheReverse
-   */
-  public Map<String, Map<ArrayWrapper, Integer>> getHierCacheReverse() {
-    return hierCacheReverse;
-  }
-
-  /**
-   * @param hierCacheReverse the hierCacheReverse to set
-   */
-  public void setHierCacheReverse(Map<String, Map<ArrayWrapper, Integer>> hierCacheReverse) {
-    this.hierCacheReverse = hierCacheReverse;
-  }
-
-  public int[] getMax() {
-    return max;
-  }
-
-  public void setMax(int[] max) {
-    this.max = max;
-  }
-
-  /**
-   * @return the measureMaxSurroagetMap
-   */
-  public Map<String, Integer> getMeasureMaxSurroagetMap() {
-    return measureMaxSurroagetMap;
-  }
-
-  /**
-   * @param measureMaxSurroagetMap the measureMaxSurroagetMap to set
-   */
-  public void setMeasureMaxSurroagetMap(Map<String, Integer> measureMaxSurroagetMap) {
-    this.measureMaxSurroagetMap = measureMaxSurroagetMap;
-  }
-
-  /**
-   * @return
-   */
-  public CarbonDimension[] getDimensionOrdinalToDimensionMapping() {
-    return dimensionOrdinalToDimensionMapping;
-  }
-
-  /**
-   * @param dimensionOrdinalToDimensionMapping
-   */
-  public void setDimensionOrdinalToDimensionMapping(
-      CarbonDimension[] dimensionOrdinalToDimensionMapping) {
-    this.dimensionOrdinalToDimensionMapping = dimensionOrdinalToDimensionMapping;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java
deleted file mode 100644
index d78dc1a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.trans.step.BaseStepData;
-import org.pentaho.di.trans.step.StepDataInterface;
-
-public class CarbonCSVBasedSeqGenData extends BaseStepData implements StepDataInterface {
-
-  /**
-   * outputRowMeta
-   */
-  private RowMetaInterface outputRowMeta;
-
-  /**
-   * surrogateKeyGen
-   */
-  private CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen;
-
-  /**
-   * keyGenerators
-   */
-  private Map<String, KeyGenerator> keyGenerators =
-      new HashMap<String, KeyGenerator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  /**
-   * columnIndex
-   */
-  private Map<String, int[]> columnIndex =
-      new HashMap<String, int[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  /**
-   * precomputed default objects
-   */
-  private Object[] defaultObjects;
-
-  /**
-   * generator
-   */
-  private KeyGenerator generator;
-
-  /**
-   * the size of the input rows
-   */
-  private int inputSize;
-
-
-  public CarbonCSVBasedSeqGenData() {
-    super();
-  }
-
-  /**
-   * @return Returns the surrogateKeyGen.
-   */
-  public CarbonCSVBasedDimSurrogateKeyGen getSurrogateKeyGen() {
-    return surrogateKeyGen;
-  }
-
-  /**
-   * @param surrogateKeyGen The surrogateKeyGen to set.
-   */
-  public void setSurrogateKeyGen(CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen) {
-    this.surrogateKeyGen = surrogateKeyGen;
-  }
-
-  /**
-   * @param inputSize The inputSize to set.
-   */
-  public void setInputSize(int inputSize) {
-    this.inputSize = inputSize;
-  }
-
-  /**
-   * @param generator The generator to set.
-   */
-  public void setGenerator(KeyGenerator generator) {
-    this.generator = generator;
-  }
-
-  /**
-   * @return Returns the keyGenerators.
-   */
-  public Map<String, KeyGenerator> getKeyGenerators() {
-    return keyGenerators;
-  }
-
-  /**
-   * @return Returns the outputRowMeta.
-   */
-  public RowMetaInterface getOutputRowMeta() {
-    return outputRowMeta;
-  }
-
-  /**
-   * @param outputRowMeta The outputRowMeta to set.
-   */
-  public void setOutputRowMeta(RowMetaInterface outputRowMeta) {
-    this.outputRowMeta = outputRowMeta;
-  }
-
-  public void clean() {
-    outputRowMeta = null;
-
-    surrogateKeyGen = null;
-
-    generator = null;
-    keyGenerators = null;
-
-    columnIndex = null;
-
-    defaultObjects = null;
-
-  }
-}