You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/30 05:12:14 UTC
[06/13] incubator-carbondata git commit: Removed kettle related code
and refactored
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/merger/step/CarbonSliceMergerStepMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/step/CarbonSliceMergerStepMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/step/CarbonSliceMergerStepMeta.java
deleted file mode 100644
index 6030a30..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/step/CarbonSliceMergerStepMeta.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.merger.step;
-
-import java.util.List;
-import java.util.Map;
-
-import org.pentaho.di.core.CheckResult;
-import org.pentaho.di.core.CheckResultInterface;
-import org.pentaho.di.core.Counter;
-import org.pentaho.di.core.database.DatabaseMeta;
-import org.pentaho.di.core.exception.KettleException;
-import org.pentaho.di.core.exception.KettleXMLException;
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.core.xml.XMLHandler;
-import org.pentaho.di.i18n.BaseMessages;
-import org.pentaho.di.repository.ObjectId;
-import org.pentaho.di.repository.Repository;
-import org.pentaho.di.trans.Trans;
-import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.BaseStepMeta;
-import org.pentaho.di.trans.step.StepDataInterface;
-import org.pentaho.di.trans.step.StepInterface;
-import org.pentaho.di.trans.step.StepMeta;
-import org.pentaho.di.trans.step.StepMetaInterface;
-import org.w3c.dom.Node;
-
-public class CarbonSliceMergerStepMeta extends BaseStepMeta
- implements StepMetaInterface, Cloneable {
-
- /**
- * for i18n purposes
- */
- private static final Class<?> PKG = CarbonSliceMergerStepMeta.class;
-
- /**
- * table name
- */
- private String tabelName;
-
- /**
- * mdkey size
- */
- private String mdkeySize;
-
- /**
- * measureCount
- */
- private String measureCount;
-
- /**
- * heirAndKeySize
- */
- private String heirAndKeySize;
-
- /**
- * databaseName
- */
- private String databaseName;
-
- /**
- * tableName
- */
- private String tableName;
-
- /**
- * isGroupByEnabled
- */
- private String groupByEnabled;
-
- /**
- * aggregatorString
- */
- private String aggregatorString;
-
- /**
- * aggregatorClassString
- */
- private String aggregatorClassString;
-
- /**
- * factDimLensString
- */
- private String factDimLensString;
-
- private String levelAnddataTypeString;
- /**
- * partitionID
- */
- private String partitionID;
- /**
- * Id of the load folder
- */
- private String segmentId;
- /**
- * task id, each spark task has a unique id
- */
- private String taskNo;
-
- /**
- * CarbonDataWriterStepMeta constructor to initialize this class
- */
- public CarbonSliceMergerStepMeta() {
- super();
- }
-
- /**
- * set the default value for all the properties
- */
- @Override public void setDefault() {
- tabelName = "";
- mdkeySize = "";
- measureCount = "";
- heirAndKeySize = "";
- tableName = "";
- databaseName = "";
- groupByEnabled = "";
- aggregatorClassString = "";
- aggregatorString = "";
- factDimLensString = "";
- levelAnddataTypeString = "";
- partitionID = "";
- segmentId = "";
- taskNo = "";
- }
-
- /**
- * Get the XML that represents the values in this step
- *
- * @return the XML that represents the metadata in this step
- * @throws KettleException in case there is a conversion or XML encoding error
- */
- public String getXML() {
- StringBuilder retval = new StringBuilder(150);
- retval.append(" ").append(XMLHandler.addTagValue("TableName", tabelName));
- retval.append(" ").append(XMLHandler.addTagValue("MDKeySize", mdkeySize));
- retval.append(" ").append(XMLHandler.addTagValue("Measurecount", measureCount));
- retval.append(" ").append(XMLHandler.addTagValue("HeirAndKeySize", heirAndKeySize));
- retval.append(" ").append(XMLHandler.addTagValue("tableName", tableName));
- retval.append(" ").append(XMLHandler.addTagValue("databaseName", databaseName));
- retval.append(" ").append(XMLHandler.addTagValue("isGroupByEnabled", groupByEnabled));
- retval.append(" ")
- .append(XMLHandler.addTagValue("aggregatorClassString", aggregatorClassString));
- retval.append(" ").append(XMLHandler.addTagValue("aggregatorString", aggregatorString));
- retval.append(" ").append(XMLHandler.addTagValue("factDimLensString", factDimLensString));
- retval.append(" ")
- .append(XMLHandler.addTagValue("levelAnddataTypeString", levelAnddataTypeString));
- retval.append(" ").append(XMLHandler.addTagValue("partitionID", partitionID));
- retval.append(" ").append(XMLHandler.addTagValue("segmentId", segmentId));
- retval.append(" ").append(XMLHandler.addTagValue("taskNo", taskNo));
- return retval.toString();
- }
-
- /**
- * Load the values for this step from an XML Node
- *
- * @param stepnode the Node to get the info from
- * @param databases The available list of databases to reference to
- * @param counters Counters to reference.
- * @throws KettleXMLException When an unexpected XML error occurred. (malformed etc.)
- */
- @Override public void loadXML(Node stepnode, List<DatabaseMeta> databases,
- Map<String, Counter> counters) throws KettleXMLException {
- try {
- databaseName = XMLHandler.getTagValue(stepnode, "databaseName");
- tabelName = XMLHandler.getTagValue(stepnode, "TableName");
- mdkeySize = XMLHandler.getTagValue(stepnode, "MDKeySize");
- measureCount = XMLHandler.getTagValue(stepnode, "Measurecount");
- heirAndKeySize = XMLHandler.getTagValue(stepnode, "HeirAndKeySize");
- tableName = XMLHandler.getTagValue(stepnode, "tableName");
- groupByEnabled = XMLHandler.getTagValue(stepnode, "isGroupByEnabled");
- aggregatorClassString = XMLHandler.getTagValue(stepnode, "aggregatorClassString");
- aggregatorString = XMLHandler.getTagValue(stepnode, "aggregatorString");
- factDimLensString = XMLHandler.getTagValue(stepnode, "factDimLensString");
- levelAnddataTypeString = XMLHandler.getTagValue(stepnode, "levelAnddataTypeString");
- partitionID = XMLHandler.getTagValue(stepnode, "partitionID");
- segmentId = XMLHandler.getTagValue(stepnode, "segmentId");
- taskNo = XMLHandler.getTagValue(stepnode, "taskNo");
- } catch (Exception e) {
- throw new KettleXMLException("Unable to read step info from XML node", e);
- }
- }
-
- /**
- * Save the steps data into a Kettle repository
- *
- * @param rep The Kettle repository to save to
- * @param idTransformation The transformation ID
- * @param idStep The step ID
- * @throws KettleException When an unexpected error occurred (database, network, etc)
- */
- @Override public void saveRep(Repository rep, ObjectId idTransformation, ObjectId idStep)
- throws KettleException {
- try {
- rep.saveStepAttribute(idTransformation, idStep, "TableName", tabelName); //$NON-NLS-1$
- rep.saveStepAttribute(idTransformation, idStep, "MDKeySize", mdkeySize); //$NON-NLS-1$
- rep.saveStepAttribute(idTransformation, idStep, "Measurecount", measureCount);
- rep.saveStepAttribute(idTransformation, idStep, "HeirAndKeySize",
- heirAndKeySize); //$NON-NLS-1$
- rep.saveStepAttribute(idTransformation, idStep, "tableName", tableName); //$NON-NLS-1$
- rep.saveStepAttribute(idTransformation, idStep, "databaseName", databaseName); //$NON-NLS-1$
- rep.saveStepAttribute(idTransformation, idStep, "isGroupByEnabled", groupByEnabled);
- rep.saveStepAttribute(idTransformation, idStep, "aggregatorClassString",
- aggregatorClassString);
- rep.saveStepAttribute(idTransformation, idStep, "aggregatorString", aggregatorString);
- rep.saveStepAttribute(idTransformation, idStep, "factDimLensString", factDimLensString);
- rep.saveStepAttribute(idTransformation, idStep, "levelAnddataTypeString",
- levelAnddataTypeString);
- rep.saveStepAttribute(idTransformation, idStep, "partitionID", partitionID);
- rep.saveStepAttribute(idTransformation, idStep, "segmentId", segmentId);
- rep.saveStepAttribute(idTransformation, idStep, "taskNo", taskNo);
- } catch (Exception e) {
- throw new KettleException(
- BaseMessages.getString(PKG, "TemplateStep.Exception.UnableToSaveStepInfoToRepository")
- + idStep, e);
- }
- }
-
- /**
- * Make an exact copy of this step, make sure to explicitly copy Collections
- * etc.
- *
- * @return an exact copy of this step
- */
- public Object clone() {
- Object retval = super.clone();
- return retval;
- }
-
- /**
- * Read the steps information from a Kettle repository
- *
- * @param rep The repository to read from
- * @param idStep The step ID
- * @param databases The databases to reference
- * @param counters The counters to reference
- * @throws KettleException When an unexpected error occurred (database, network, etc)
- */
- @Override public void readRep(Repository rep, ObjectId idStep, List<DatabaseMeta> databases,
- Map<String, Counter> counters) throws KettleException {
- try {
- tabelName = rep.getStepAttributeString(idStep, "TableName");
- mdkeySize = rep.getStepAttributeString(idStep, "MDKeySize");
- measureCount = rep.getStepAttributeString(idStep, "Measurecount");
- heirAndKeySize = rep.getStepAttributeString(idStep, "HeirAndKeySize");
- databaseName = rep.getStepAttributeString(idStep, "databaseName");
- tableName = rep.getStepAttributeString(idStep, "tableName");
- groupByEnabled = rep.getStepAttributeString(idStep, "isGroupByEnabled");
- aggregatorClassString = rep.getStepAttributeString(idStep, "aggregatorClassString");
- aggregatorString = rep.getStepAttributeString(idStep, "aggregatorString");
- factDimLensString = rep.getStepAttributeString(idStep, "factDimLensString");
- levelAnddataTypeString = rep.getStepAttributeString(idStep, "levelAnddataTypeString");
- partitionID = rep.getStepAttributeString(idStep, "partitionID");
- segmentId = rep.getStepAttributeString(idStep, "segmentId");
- taskNo = rep.getStepAttributeString(idStep, "taskNo");
- } catch (Exception exception) {
- throw new KettleException(BaseMessages
- .getString(PKG, "CarbonDataWriterStepMeta.Exception.UnexpectedErrorInReadingStepInfo"),
- exception);
- }
-
- }
-
- /**
- * Get the executing step, needed by Trans to launch a step.
- *
- * @param stepMeta The step info
- * @param stepDataInterface the step data interface linked to this step. Here the step can
- * store temporary data, database connections, etc.
- * @param copyNr The copy nr to get
- * @param transMeta The transformation info
- * @param trans The launching transformation
- */
- @Override public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface,
- int copyNr, TransMeta transMeta, Trans trans) {
- return new CarbonSliceMergerStep(stepMeta, stepDataInterface, copyNr, transMeta, trans);
- }
-
- /**
- * Checks the settings of this step and puts the findings in a remarks List.
- *
- * @param remarks The list to put the remarks in @see
- * org.pentaho.di.core.CheckResult
- * @param stepMeta The stepMeta to help checking
- * @param prev The fields coming from the previous step
- * @param input The input step names
- * @param output The output step names
- * @param info The fields that are used as information by the step
- */
- @Override public void check(List<CheckResultInterface> remarks, TransMeta transMeta,
- StepMeta stepMeta, RowMetaInterface prev, String[] input, String[] output,
- RowMetaInterface info) {
-
- CheckResult checkResVal;
-
- // See if we have input streams leading to this step!
- if (input.length > 0) {
- checkResVal =
- new CheckResult(CheckResult.TYPE_RESULT_OK, "Step is receiving info from other steps.",
- stepMeta);
- remarks.add(checkResVal);
- } else {
- checkResVal =
- new CheckResult(CheckResult.TYPE_RESULT_ERROR, "No input received from other steps!",
- stepMeta);
- remarks.add(checkResVal);
- }
-
- }
-
- /**
- * Get a new instance of the appropriate data class. This data class
- * implements the StepDataInterface. It basically contains the persisting
- * data that needs to live on, even if a worker thread is terminated.
- *
- * @return The appropriate StepDataInterface class.
- */
- @Override public StepDataInterface getStepData() {
- return new CarbonSliceMergerStepData();
- }
-
- /**
- * This method will return the table name
- *
- * @return tabelName
- */
- public String getTabelName() {
- return tabelName;
- }
-
- /**
- * This method will set the table name
- *
- * @param tabelName
- */
- public void setTabelName(String tabelName) {
- this.tabelName = tabelName;
- }
-
- /**
- * This method will be used to set the mdkey
- *
- * @param mdkeySize
- */
- public void setMdkeySize(String mdkeySize) {
- this.mdkeySize = mdkeySize;
- }
-
- /**
- * This method will be used to set the measure count
- *
- * @param measureCount
- */
- public void setMeasureCount(String measureCount) {
- this.measureCount = measureCount;
- }
-
- /**
- * This method will be used to set the heir and key size string
- *
- * @param heirAndKeySize
- */
- public void setHeirAndKeySize(String heirAndKeySize) {
- this.heirAndKeySize = heirAndKeySize;
- }
-
- /**
- * @return the databaseName
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * @param databaseName the databaseName to set
- */
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- /**
- * @return the tableName
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * @param tableName the tableName to set
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @param isGroupByEnabled the isGroupByEnabled to set
- */
- public void setGroupByEnabled(String isGroupByEnabled) {
- this.groupByEnabled = isGroupByEnabled;
- }
-
- /**
- * @param aggregatorString the aggregatorString to set
- */
- public void setAggregatorString(String aggregatorString) {
- this.aggregatorString = aggregatorString;
- }
-
- /**
- * @param aggregatorClassString the aggregatorClassString to set
- */
- public void setAggregatorClassString(String aggregatorClassString) {
- this.aggregatorClassString = aggregatorClassString;
- }
-
- /**
- * @param factDimLensString1 the factDimLensString to set
- */
- public void setFactDimLensString(String factDimLensString1) {
- this.factDimLensString = factDimLensString1;
- }
-
- public void setLevelAnddataTypeString(String levelAnddataTypeString) {
- this.levelAnddataTypeString = levelAnddataTypeString;
- }
-
- /**
- * @param partitionID
- */
- public void setPartitionID(String partitionID) {
- this.partitionID = partitionID;
- }
-
- /**
- * set segment Id
- * @param segmentId
- */
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- /**
- * @param taskNo
- */
- public void setTaskNo(String taskNo) {
- this.taskNo = taskNo;
- }
-
- /**
- * @return
- */
- public String getTaskNo() {
- return taskNo;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 433f8db..ad96578 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -85,7 +85,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
sortParameters.getDimColCount(),
sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
- sortParameters.getNoDictionaryDimnesionColumn(), sortParameters.isUseKettle());
+ sortParameters.getNoDictionaryDimnesionColumn());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 813d83d..e3049d2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -139,8 +139,7 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
- sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
- sortParameters.isUseKettle());
+ sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn());
return finalMerger;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 7eacd08..f10e73a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -134,7 +134,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
"Error while initializing data handler : " + e.getMessage());
} catch (Exception e) {
LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
- throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+ throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ArrayWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ArrayWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ArrayWrapper.java
deleted file mode 100644
index 27c3718..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ArrayWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class ArrayWrapper implements Serializable {
-
- /**
- * Comment for <code>serialVersionUID</code>
- */
- private static final long serialVersionUID = -2016551342632572869L;
-
- /**
- * data
- */
- private int[] data;
-
- public ArrayWrapper(int[] data) {
- if (data == null) {
- throw new IllegalArgumentException();
- }
- this.data = data;
- }
-
- @Override public boolean equals(Object other) {
- if (other instanceof ArrayWrapper) {
- return Arrays.equals(data, ((ArrayWrapper) other).data);
- } else {
- return false;
- }
-
- }
-
- @Override public int hashCode() {
- return Arrays.hashCode(data);
- }
-
- public int[] getData() {
- return data;
- }
-
- public void setData(int[] data) {
- this.data = data;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetails.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetails.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetails.java
deleted file mode 100644
index 2484002..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetails.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/**
- * Class holds the common column schema details needed for the data load
- */
-public class ColumnSchemaDetails {
-
- /**
- * column Name
- */
- private String columnName;
- /**
- * column datatype
- */
- private DataType columnType;
- /**
- * boolean to identify direct dictionary column
- */
- private Boolean isDirectDictionary;
-
- /**
- * Constructor to initialize object from the input string separated by comma (,)
- *
- * @param input
- */
- ColumnSchemaDetails(String input) {
- String[] splits = input.split(",");
- columnName = splits[0];
- columnType = DataTypeUtil.getDataType(splits[1]);
- isDirectDictionary = Boolean.parseBoolean(splits[2]);
- }
-
- /**
- * Constructor to initialize the ColumnSchemaDetails
- *
- * @param columnName
- * @param columnType
- * @param isDirectDictionary
- */
- public ColumnSchemaDetails(String columnName, DataType columnType, Boolean isDirectDictionary) {
- this.columnName = columnName;
- this.columnType = columnType;
- this.isDirectDictionary = isDirectDictionary;
-
- }
-
- /**
- * returns the ColumnName
- *
- * @return
- */
- public String getColumnName() {
- return columnName;
- }
-
- /**
- * returns the dataType of the column
- *
- * @return
- */
- public DataType getColumnType() {
- return columnType;
- }
-
- /**
- * returns boolean value to identify direct dictionary
- *
- * @return
- */
- public Boolean isDirectDictionary() {
- return isDirectDictionary;
- }
-
- /**
- * @return
- */
- public String toString() {
- return columnName + "," + columnType + "," + isDirectDictionary;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetailsWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetailsWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetailsWrapper.java
deleted file mode 100644
index 554dd06..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnSchemaDetailsWrapper.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-/**
- * Wrapper class to hold the columnschema details
- */
-public class ColumnSchemaDetailsWrapper {
-
- /**
- * Map of the ColumnSchemaDetails
- */
- private Map<String, ColumnSchemaDetails> columnSchemaDetailsMap;
-
- /**
- * return the string object
- *
- * @return
- */
- public String toString() {
- StringBuilder builder = new StringBuilder();
- Set<Map.Entry<String, ColumnSchemaDetails>> entries = columnSchemaDetailsMap.entrySet();
- Iterator<Map.Entry<String, ColumnSchemaDetails>> iterator = entries.iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<String, ColumnSchemaDetails> entry = iterator.next();
- builder.append(entry.getKey());
- builder.append(CarbonCommonConstants.HASH_SPC_CHARACTER);
- builder.append(entry.getValue().toString());
- if (iterator.hasNext()) {
- builder.append(CarbonCommonConstants.HASH_SPC_CHARACTER);
- }
- }
- return builder.toString();
- }
-
- /**
- * default constructor
- */
- public ColumnSchemaDetailsWrapper() {
-
- }
-
- /**
- * Constructor take serialized string as input and populates the List of columnschema details
- *
- * @param input
- */
- public ColumnSchemaDetailsWrapper(String input) {
- columnSchemaDetailsMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- String[] split = input.split(CarbonCommonConstants.HASH_SPC_CHARACTER);
- for (int i = 0; i < split.length; i++) {
- String key = split[i++];
- ColumnSchemaDetails details = new ColumnSchemaDetails(split[i]);
- columnSchemaDetailsMap.put(key, details);
- }
- }
-
- /**
- * returns ColumnSchemaDetails of all columns
- *
- * @return
- */
- public Map<String, ColumnSchemaDetails> getColumnSchemaDetailsMap() {
- return columnSchemaDetailsMap;
- }
-
- /**
- * sets the map of column schema
- *
- * @param columnSchemaDetailsMap
- */
- public void setColumnSchemaDetailsMap(Map<String, ColumnSchemaDetails> columnSchemaDetailsMap) {
- this.columnSchemaDetailsMap = columnSchemaDetailsMap;
- }
-
- /**
- * returns the columnSchemaDetails of requested column
- *
- * @param key
- * @return
- */
- public ColumnSchemaDetails get(String key) {
- return columnSchemaDetailsMap.get(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnsInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnsInfo.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnsInfo.java
deleted file mode 100644
index b8b8bcd..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/ColumnsInfo.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-
-public class ColumnsInfo {
-
- /**
- * Indices for dimensions in the record. Doesn't include any properties.
- */
- private int[] dims;
-
- /**
- * Map<HierTableName, KeyGenerator>
- */
- private Map<String, KeyGenerator> keyGenerators;
-
- /**
- * Hierarchy table names (Same will be file names for file store or
- * incremental load)
- */
- private Set<String> hierTables;
-
- /**
- * Batch size configured in transformation
- */
- private int batchSize;
-
- /**
- * To decide it is data load for aggregate table or not.
- */
- private boolean isAggregateLoad;
-
- /**
- * Store type DB or file based ?
- */
- private String storeType;
-
- /**
- * column Names for dimensions. Which will be used as table name for store
- */
- private String[] dimColNames;
-
- /**
- * baseStoreLocation
- */
- private String baseStoreLocation;
-
- /**
- * Maximum possible surrogate key for dimension possible based on
- * cardinality value in schema definition
- */
- private int[] maxKeys;
-
- /**
- * Dimension Index, Properties indices in the tuple.
- * [0] - [2,3] - 2 Props at indices 2 & 3 [1] - [4,7,8] - 3 props at indices
- * 4,7, & 8 [2] - [] - No props
- */
- private int[][] propIndx;
-
- /**
- * Dimension Index, Property column names from table.
- * [0] - [col2,col3] [1] - [col4,col7,col8] [2] - []
- */
- private List<String>[] propColumns;
-
- /**
- * timDimIndex
- */
- private int timDimIndex;
-
- /**
- * timDimIndexEnd
- */
- private int timDimIndexEnd;
-
- /**
- * timeOrdinalIndices
- */
- private int[] timeOrdinalIndices;
-
- /**
- * timeOrdinalCols
- */
- private String[] timeOrdinalCols;
-
- /**
- * propTypes
- */
- private List<String>[] propTypes;
-
- /**
- * dimHierRel
- */
- private String[] dimHierRel;
-
- /**
- * tableName
- */
- private String tableName;
-
- /**
- * Primary key Map
- */
- private Map<String, Boolean> primaryKeyMap;
-
- /**
- * measureColumns
- */
- private String[] measureColumns;
-
- private boolean[] dimsPresent;
-
- private String databaseName;
-
- private Map<String, GenericDataType> complexTypesMap;
-
- /**
- * column Ids of dimensions in a table
- */
- private String[] dimensionColumnIds;
-
- /**
- * wrapper object having the columnSchemaDetails
- */
- private ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper;
-
- private Map<String, Map<String, String>> columnProperties;
-
- public Map<String, GenericDataType> getComplexTypesMap() {
- return complexTypesMap;
- }
-
- public void setComplexTypesMap(Map<String, GenericDataType> complexTypesMap) {
- this.complexTypesMap = complexTypesMap;
- }
-
- /**
- * @return Returns the dims.
- */
- public int[] getDims() {
- return dims;
- }
-
- /**
- * @param dims The dims to set.
- */
- public void setDims(int[] dims) {
- this.dims = dims;
- }
-
- /**
- * @return Returns the keyGenerators.
- */
- public Map<String, KeyGenerator> getKeyGenerators() {
- return keyGenerators;
- }
-
- /**
- * @param keyGenerators The keyGenerators to set.
- */
- public void setKeyGenerators(Map<String, KeyGenerator> keyGenerators) {
- this.keyGenerators = keyGenerators;
- }
-
- /**
- * @return Returns the tableName.
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * @param tableName The tableName to set.
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @return Returns the hierTables.
- */
- public Set<String> getHierTables() {
- return hierTables;
- }
-
- /**
- * @param hierTables The hierTables to set.
- */
- public void setHierTables(Set<String> hierTables) {
- this.hierTables = hierTables;
- }
-
- /**
- * @param batchSize The batchSize to set.
- */
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- public void setAggregateLoad(boolean isAggregateLoad) {
- this.isAggregateLoad = isAggregateLoad;
- }
-
- /**
- * @param storeType The storeType to set.
- */
- public void setStoreType(String storeType) {
- this.storeType = storeType;
- }
-
- /**
- * @return Returns the dimColNames.
- */
- public String[] getDimColNames() {
- return dimColNames;
- }
-
- /**
- * @param dimColNames The dimColNames to set.
- */
- public void setDimColNames(String[] dimColNames) {
- this.dimColNames = dimColNames;
- }
-
- /**
- * @return Returns the maxKeys.
- */
- public int[] getMaxKeys() {
- return maxKeys;
- }
-
- /**
- * @param maxKeys The maxKeys to set.
- */
- public void setMaxKeys(int[] maxKeys) {
- this.maxKeys = maxKeys;
- }
-
- /**
- * @return Returns the propIndx.
- */
- public int[][] getPropIndx() {
- return propIndx;
- }
-
- /**
- * @param propIndx The propIndx to set.
- */
- public void setPropIndx(int[][] propIndx) {
- this.propIndx = propIndx;
- }
-
- /**
- * @param propColumns The propColumns to set.
- */
- public void setPropColumns(List<String>[] propColumns) {
- this.propColumns = propColumns;
- }
-
- /**
- * @return Returns the timDimIndex.
- */
- public int getTimDimIndex() {
- return timDimIndex;
- }
-
- /**
- * @param timDimIndex The timDimIndex to set.
- */
- public void setTimDimIndex(int timDimIndex) {
- this.timDimIndex = timDimIndex;
- }
-
- /**
- * @return Returns the timDimIndexEnd.
- */
- public int getTimDimIndexEnd() {
- return timDimIndexEnd;
- }
-
- /**
- * @return Returns the timeOrdinalIndices.
- */
- public int[] getTimeOrdinalIndices() {
- return timeOrdinalIndices;
- }
-
- /**
- * @param timeOrdinalIndices The timeOrdinalIndices to set.
- */
- public void setTimeOrdinalIndices(int[] timeOrdinalIndices) {
- this.timeOrdinalIndices = timeOrdinalIndices;
- }
-
- /**
- * @param timeOrdinalCols The timeOrdinalCols to set.
- */
- public void setTimeOrdinalCols(String[] timeOrdinalCols) {
- this.timeOrdinalCols = timeOrdinalCols;
- }
-
- /**
- * @param propTypes The propTypes to set.
- */
- public void setPropTypes(List<String>[] propTypes) {
- this.propTypes = propTypes;
- }
-
- /**
- * @return Returns the baseStoreLocation.
- */
- public String getBaseStoreLocation() {
- return baseStoreLocation;
- }
-
- /**
- * @param baseStoreLocation The baseStoreLocation to set.
- */
- public void setBaseStoreLocation(String baseStoreLocation) {
- this.baseStoreLocation = baseStoreLocation;
- }
-
- /**
- * @param dimHierRel The dimHierRel to set.
- */
- public void setDimHierRel(String[] dimHierRel) {
- this.dimHierRel = dimHierRel;
- }
-
- /**
- * @return Returns the primaryKeyMap.
- */
- public Map<String, Boolean> getPrimaryKeyMap() {
- return primaryKeyMap;
- }
-
- /**
- * @param primaryKeyMap The primaryKeyMap to set.
- */
- public void setPrimaryKeyMap(Map<String, Boolean> primaryKeyMap) {
- this.primaryKeyMap = primaryKeyMap;
- }
-
- /**
- * getDimsPresent
- *
- * @return boolean[]
- */
- public boolean[] getDimsPresent() {
- return dimsPresent;
- }
-
- /**
- * @param measureColumns The measureColumns to set.
- */
- public void setMeasureColumns(String[] measureColumns) {
- this.measureColumns = measureColumns;
- }
-
- public String getDatabaseName() {
- return databaseName;
- }
-
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- /**
- * @return column Ids
- */
- public String[] getDimensionColumnIds() {
- return dimensionColumnIds;
- }
-
- /**
- * @param dimensionColumnIds column Ids for dimensions in a table
- */
- public void setDimensionColumnIds(String[] dimensionColumnIds) {
- this.dimensionColumnIds = dimensionColumnIds;
- }
-
- /**
- * returns wrapper object having the columnSchemaDetails
- *
- * @return
- */
- public ColumnSchemaDetailsWrapper getColumnSchemaDetailsWrapper() {
- return columnSchemaDetailsWrapper;
- }
-
- /**
- * set the wrapper object having the columnSchemaDetails
- *
- * @param columnSchemaDetailsWrapper
- */
- public void setColumnSchemaDetailsWrapper(ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper) {
- this.columnSchemaDetailsWrapper = columnSchemaDetailsWrapper;
- }
-
- public void setColumnProperties(Map<String, Map<String, String>> columnProperties) {
- this.columnProperties = columnProperties;
- }
-
- public Map<String, String> getColumnProperties(String columnName) {
- return this.columnProperties.get(columnName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/HierarchiesInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/HierarchiesInfo.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/HierarchiesInfo.java
deleted file mode 100644
index 32f096d..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/HierarchiesInfo.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import java.util.Map;
-
-public class HierarchiesInfo {
-
- /**
- * hierarichieName
- */
- private String hierarichieName;
-
- /**
- * columnPropMap
- */
- private Map<String, String[]> columnPropMap;
-
- public String getHierarichieName() {
- return hierarichieName;
- }
-
- public void setHierarichieName(String hierarichieName) {
- this.hierarichieName = hierarichieName;
- }
-
- public Map<String, String[]> getColumnPropMap() {
- return columnPropMap;
- }
-
- public void setColumnPropMap(Map<String, String[]> columnPropMap) {
- this.columnPropMap = columnPropMap;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOption.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOption.java
deleted file mode 100644
index c01d800..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOption.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-/**
- * This class is to hold the key value pair of properties needed while dataload.
- */
-public class TableOption {
- /**
- * option key name
- */
- private String optionKey;
- /**
- * option key value
- */
- private String optionValue;
-
- /**
- * the constructor to initialize the key value pair TableOption instance
- *
- * @param optionKey
- * @param optionValue
- */
- public TableOption(String optionKey, String optionValue) {
- this.optionKey = optionKey;
- this.optionValue = optionValue;
- }
-
- /**
- * constructor to init from te string separated by comma(,)
- *
- * @param str
- */
- public TableOption(String str) {
- //passing 2 to split the key value pair having empty value for the corresponding key.
- String[] split = str.split(",", 2);
- this.optionKey = split[0];
- this.optionValue = split[1];
- }
-
- /**
- * returns options key
- *
- * @return
- */
- public String getOptionKey() {
- return optionKey;
- }
-
- /**
- * returns options value
- *
- * @return
- */
- public String getOptionValue() {
- return optionValue;
- }
-
- /**
- * @return
- */
- public String toString() {
- return optionKey + "," + optionValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOptionWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOptionWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOptionWrapper.java
deleted file mode 100644
index 02f797a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/TableOptionWrapper.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-/**
- * The class hold the table option details being used while dataload
- */
-public class TableOptionWrapper {
- /**
- * map holds the table options
- */
- private static final Map<String, TableOption> mapOFOptions =
- new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- private static TableOptionWrapper tableOptionWrapper = new TableOptionWrapper();
-
- /**
- * to initialize the wrapper object
- */
- private TableOptionWrapper() {
- }
-
- /**
- * @param input
- */
- public static void populateTableOptions(String input) {
- String[] split =
- null != input ? input.split(CarbonCommonConstants.HASH_SPC_CHARACTER) : new String[0];
- for (String str : split) {
- TableOption tableOption = new TableOption(str);
- mapOFOptions.put(tableOption.getOptionKey(), tableOption);
- }
- }
-
- /**
- * @param input
- */
- public static void setTableOption(String input) {
- if (null != input) {
- TableOption tableOption = new TableOption(input);
- mapOFOptions.put(tableOption.getOptionKey(), tableOption);
- }
- }
-
- /**
- * returns TableOptionWrapper instance
- *
- * @return
- */
- public static TableOptionWrapper getTableOptionWrapperInstance() {
- return tableOptionWrapper;
- }
-
- /**
- * returns the options key value
- * return null if the key is not found in the map
- *
- * @param key
- * @return
- */
- public String get(String key) {
- TableOption tableOption = mapOFOptions.get(key);
- return null != tableOption ? tableOption.getOptionValue() : null;
- }
-
- /**
- * return the string object
- *
- * @return
- */
- public String toString() {
- StringBuilder builder = new StringBuilder();
- Set<Map.Entry<String, TableOption>> entries = mapOFOptions.entrySet();
- Iterator<Map.Entry<String, TableOption>> iterator = entries.iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<String, TableOption> entry = iterator.next();
- builder.append(entry.getValue().toString());
- builder.append(CarbonCommonConstants.HASH_SPC_CHARACTER);
- }
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 1b16675..0ac2d5c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -87,8 +87,6 @@ public class IntermediateFileMerger implements Callable<Void> {
private File outPutFile;
- private boolean useKettle;
-
private boolean[] noDictionarycolumnMapping;
/**
@@ -100,7 +98,6 @@ public class IntermediateFileMerger implements Callable<Void> {
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
- this.useKettle = mergerParameters.isUseKettle();
noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
}
@@ -111,14 +108,8 @@ public class IntermediateFileMerger implements Callable<Void> {
try {
startSorting();
initialize();
- if (useKettle) {
- while (hasNext()) {
- writeDataTofile(next());
- }
- } else {
- while (hasNext()) {
- writeDataTofileWithOutKettle(next());
- }
+ while (hasNext()) {
+ writeDataTofile(next());
}
if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
if (entryCount > 0) {
@@ -260,8 +251,7 @@ public class IntermediateFileMerger implements Callable<Void> {
new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
- mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(),
- mergerParameters.isUseKettle());
+ mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn());
// initialize
sortTempFileChunkHolder.initialize();
@@ -283,7 +273,7 @@ public class IntermediateFileMerger implements Callable<Void> {
*/
private void createRecordHolderQueue(File[] listFiles) {
// creating record holder heap
- this.recordHolderHeap = new PriorityQueue<SortTempFileChunkHolder>(listFiles.length);
+ this.recordHolderHeap = new PriorityQueue<>(listFiles.length);
}
/**
@@ -309,8 +299,6 @@ public class IntermediateFileMerger implements Callable<Void> {
/**
* Below method will be used to write data to file
*
- * TODO Remove it after kettle is removed
- *
* @throws CarbonSortKeyAndGroupByException problem while writing
*/
private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
@@ -330,71 +318,6 @@ public class IntermediateFileMerger implements Callable<Void> {
return;
}
try {
- int fieldIndex = 0;
- char[] aggType = mergerParameters.getAggType();
-
- for (int counter = 0; counter < mergerParameters.getDimColCount(); counter++) {
- stream.writeInt((Integer) NonDictionaryUtil.getDimension(fieldIndex++, row));
- }
-
- // added for high card also
- if ((mergerParameters.getNoDictionaryCount() + mergerParameters
- .getComplexDimColCount()) > 0) {
- stream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row));
- }
-
- fieldIndex = 0;
- for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
- if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
- stream.write((byte) 1);
- if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
- Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeDouble(val);
- } else if (aggType[counter] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
- Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeDouble(val);
- } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
- Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeLong(val);
- } else if (aggType[counter] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
- }
- } else {
- stream.write((byte) 0);
- }
-
- fieldIndex++;
- }
-
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
- }
- }
-
- /**
- * Below method will be used to write data to file
- *
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- private void writeDataTofileWithOutKettle(Object[] row) throws CarbonSortKeyAndGroupByException {
- if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
- if (entryCount == 0) {
- records = new Object[totalSize][];
- records[entryCount++] = row;
- return;
- }
-
- records[entryCount++] = row;
- if (entryCount == totalSize) {
- this.writer.writeSortTempFile(records);
- entryCount = 0;
- records = new Object[totalSize][];
- }
- return;
- }
- try {
char[] aggType = mergerParameters.getAggType();
int[] mdkArray = (int[]) row[0];
byte[][] nonDictArray = (byte[][]) row[1];
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 794935d..9b5a850 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
public class SortDataRows {
/**
@@ -193,19 +192,10 @@ public class SortDataRows {
toSort = new Object[entryCount][];
System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
- if (parameters.isUseKettle()) {
- if (parameters.getNoDictionaryCount() > 0) {
- Arrays.sort(toSort, new RowComparator(parameters.getNoDictionaryDimnesionColumn(),
- parameters.getNoDictionaryCount()));
- } else {
- Arrays.sort(toSort, new RowComparatorForNormalDims(parameters.getDimColCount()));
- }
+ if (parameters.getNoDictionaryCount() > 0) {
+ Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
} else {
- if (parameters.getNoDictionaryCount() > 0) {
- Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
- } else {
- Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getDimColCount()));
- }
+ Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getDimColCount()));
}
recordHolderList = toSort;
@@ -233,11 +223,7 @@ public class SortDataRows {
writeSortTempFile(recordHolderList, entryCountLocal, file);
return;
}
- if (parameters.isUseKettle()) {
- writeData(recordHolderList, entryCountLocal, file);
- } else {
- writeDataWithOutKettle(recordHolderList, entryCountLocal, file);
- }
+ writeData(recordHolderList, entryCountLocal, file);
}
private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
@@ -258,7 +244,6 @@ public class SortDataRows {
}
}
- // TODO Remove it after kettle got removed
private void writeData(Object[][] recordHolderList, int entryCountLocal, File file)
throws CarbonSortKeyAndGroupByException {
DataOutputStream stream = null;
@@ -269,66 +254,6 @@ public class SortDataRows {
// write number of entries to the file
stream.writeInt(entryCountLocal);
- int dimColCount = parameters.getDimColCount();
- int combinedDimCount = parameters.getNoDictionaryCount() + parameters.getComplexDimColCount();
- char[] aggType = parameters.getAggType();
- Object[] row = null;
- for (int i = 0; i < entryCountLocal; i++) {
- // get row from record holder list
- row = recordHolderList[i];
- int fieldIndex = 0;
-
- for (int dimCount = 0; dimCount < dimColCount; dimCount++) {
- stream.writeInt(NonDictionaryUtil.getDimension(fieldIndex++, row));
- }
-
- // if any high cardinality dims are present then write it to the file.
-
- if (combinedDimCount > 0) {
- stream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row));
- }
-
- // as measures are stored in separate array.
- fieldIndex = 0;
- for (int mesCount = 0; mesCount < parameters.getMeasureColCount(); mesCount++) {
- if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
- stream.write((byte) 1);
- if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
- Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeDouble(val);
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
- Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeLong(val);
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- BigDecimal val = (BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row);
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
- }
- } else {
- stream.write((byte) 0);
- }
- fieldIndex++;
- }
- }
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
- } finally {
- // close streams
- CarbonUtil.closeStreams(stream);
- }
- }
-
- private void writeDataWithOutKettle(Object[][] recordHolderList, int entryCountLocal, File file)
- throws CarbonSortKeyAndGroupByException {
- DataOutputStream stream = null;
- try {
- // open stream
- stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
- parameters.getFileWriteBufferSize()));
-
- // write number of entries to the file
- stream.writeInt(entryCountLocal);
int complexDimColCount = parameters.getComplexDimColCount();
int dimColCount = parameters.getDimColCount() + complexDimColCount;
char[] aggType = parameters.getAggType();
@@ -460,23 +385,12 @@ public class SortDataRows {
@Override public Void call() throws Exception {
try {
long startTime = System.currentTimeMillis();
- if (parameters.isUseKettle()) {
- if (parameters.getNoDictionaryCount() > 0) {
- Arrays.sort(recordHolderArray,
- new RowComparator(parameters.getNoDictionaryDimnesionColumn(),
- parameters.getNoDictionaryCount()));
- } else {
- Arrays.sort(recordHolderArray,
- new RowComparatorForNormalDims(parameters.getDimColCount()));
- }
+ if (parameters.getNoDictionaryCount() > 0) {
+ Arrays.sort(recordHolderArray,
+ new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
} else {
- if (parameters.getNoDictionaryCount() > 0) {
- Arrays.sort(recordHolderArray,
- new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
- } else {
- Arrays.sort(recordHolderArray,
- new NewRowComparatorForNormalDims(parameters.getDimColCount()));
- }
+ Arrays.sort(recordHolderArray,
+ new NewRowComparatorForNormalDims(parameters.getDimColCount()));
}
// create a new file every time
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index a52ebb2..d42dc32 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -114,11 +114,6 @@ public class SortParameters {
private int numberOfCores;
- /**
- * TODO Temporary conf , it will be removed after kettle removal.
- */
- private boolean useKettle = true;
-
public SortParameters getCopy() {
SortParameters parameters = new SortParameters();
parameters.tempFileLocation = tempFileLocation;
@@ -143,7 +138,6 @@ public class SortParameters {
parameters.taskNo = taskNo;
parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
parameters.numberOfCores = numberOfCores;
- parameters.useKettle = useKettle;
return parameters;
}
@@ -323,14 +317,6 @@ public class SortParameters {
this.numberOfCores = numberOfCores;
}
- public boolean isUseKettle() {
- return useKettle;
- }
-
- public void setUseKettle(boolean useKettle) {
- this.useKettle = useKettle;
- }
-
public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
SortParameters parameters = new SortParameters();
CarbonTableIdentifier tableIdentifier =
@@ -432,7 +418,6 @@ public class SortParameters {
char[] aggType = CarbonDataProcessorUtil
.getAggType(configuration.getMeasureCount(), configuration.getMeasureFields());
parameters.setAggType(aggType);
- parameters.setUseKettle(false);
return parameters;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index fef8c9d..ae01404 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -23,7 +23,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,7 +31,6 @@ import java.util.concurrent.Future;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -134,9 +132,6 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
*/
private boolean[] isNoDictionaryDimensionColumn;
- // TODO temporary configuration, remove after kettle removal
- private boolean useKettle;
-
/**
* Constructor to initialize
*
@@ -151,7 +146,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
*/
public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType,
- boolean[] isNoDictionaryDimensionColumn, boolean useKettle) {
+ boolean[] isNoDictionaryDimensionColumn) {
// set temp file
this.tempFile = tempFile;
@@ -166,7 +161,6 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
this.executorService = Executors.newFixedThreadPool(1);
this.aggType = aggType;
this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
- this.useKettle = useKettle;
}
/**
@@ -301,83 +295,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
}
/**
- * @return
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
- // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
- if (useKettle) {
- return getRowFromStreamWithKettle();
- } else {
- return getRowFromStreamWithOutKettle();
- }
- }
-
- // TODO remove after kettle flow is removed
- private Object[] getRowFromStreamWithKettle() throws CarbonSortKeyAndGroupByException {
- Object[] holder = new Object[3];
- int index = 0;
- Integer[] dim = new Integer[this.dimensionCount];
- Object[] measures = new Object[this.measureCount];
- byte[] finalByteArr = null;
- try {
-
- // read dimension values
-
- for (int i = 0; i < this.dimensionCount; i++) {
- dim[index++] = stream.readInt();
- }
-
- if ((this.noDictionaryCount + this.complexDimensionCount) > 0) {
- short lengthOfByteArray = stream.readShort();
- ByteBuffer buff = ByteBuffer.allocate(lengthOfByteArray + 2);
- buff.putShort(lengthOfByteArray);
- byte[] byteArr = new byte[lengthOfByteArray];
- stream.readFully(byteArr);
-
- buff.put(byteArr);
- finalByteArr = buff.array();
-
- }
-
- index = 0;
- // read measure values
- for (int i = 0; i < this.measureCount; i++) {
- if (stream.readByte() == 1) {
- if (aggType[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
- measures[index++] = stream.readDouble();
- } else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
- measures[index++] = stream.readLong();
- } else {
- int len = stream.readInt();
- byte[] buff = new byte[len];
- stream.readFully(buff);
- measures[index++] = buff;
- }
- } else {
- measures[index++] = null;
- }
- }
-
- NonDictionaryUtil.prepareOutObj(holder, dim, finalByteArr, measures);
-
- // increment number if record read
- this.numberOfObjectRead++;
- } catch (IOException e) {
- LOGGER.error("Problme while reading the madkey fom sort temp file");
- throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
- }
-
- //return out row
- return holder;
- }
-
- /**
* Reads row from file
* @return Object[]
* @throws CarbonSortKeyAndGroupByException
*/
- private Object[] getRowFromStreamWithOutKettle() throws CarbonSortKeyAndGroupByException {
+ private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
// create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
Object[] holder = new Object[3];
@@ -480,60 +402,6 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
}
@Override public int compareTo(SortTempFileChunkHolder other) {
- if (useKettle) {
- return compareWithKettle(other);
-
- } else {
- return compareWithOutKettle(other);
- }
- }
-
- // TODO Remove after kettle flow is removed.
- private int compareWithKettle(SortTempFileChunkHolder other) {
- int diff = 0;
-
- int normalIndex = 0;
- int noDictionaryindex = 0;
-
- for (boolean isNoDictionary : isNoDictionaryDimensionColumn) {
-
- if (isNoDictionary) {
- byte[] byteArr1 = (byte[]) returnRow[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()];
-
- ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
-
- // extract a high card dims from complete byte[].
- NonDictionaryUtil
- .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
-
- byte[] byteArr2 =
- (byte[]) other.returnRow[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()];
-
- ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
-
- // extract a high card dims from complete byte[].
- NonDictionaryUtil
- .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
-
- int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
- if (difference != 0) {
- return difference;
- }
- noDictionaryindex++;
- } else {
- int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, returnRow);
- int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, other.returnRow);
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- normalIndex++;
- }
- }
- return diff;
- }
-
- private int compareWithOutKettle(SortTempFileChunkHolder other) {
int diff = 0;
int index = 0;
int noDictionaryIndex = 0;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
deleted file mode 100644
index 03e8b25..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdatastep;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.StandardLogService;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.schema.metadata.SortObserver;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
-
-import org.pentaho.di.core.exception.KettleException;
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.trans.Trans;
-import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.BaseStep;
-import org.pentaho.di.trans.step.StepDataInterface;
-import org.pentaho.di.trans.step.StepMeta;
-import org.pentaho.di.trans.step.StepMetaInterface;
-
-public class SortKeyStep extends BaseStep {
-
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SortKeyStep.class.getName());
-
- /**
- * CarbonSortKeyAndGroupByStepData
- */
- private SortKeyStepData data;
-
- /**
- * CarbonSortKeyAndGroupByStepMeta
- */
- private SortKeyStepMeta meta;
-
- /**
- * carbonSortKeys
- */
- private SortDataRows sortDataRows;
-
- /**
- * intermediateFileMerger
- */
- private SortIntermediateFileMerger intermediateFileMerger;
-
- /**
- * rowCounter
- */
- private long readCounter;
-
- /**
- * writeCounter
- */
- private long writeCounter;
-
- /**
- * logCounter
- */
- private int logCounter;
-
- /**
- * observer
- */
- private SortObserver observer;
-
- /**
- * To determine whether the column is dictionary or not.
- */
- private boolean[] noDictionaryColMaping;
-
- /**
- * CarbonSortKeyAndGroupByStep Constructor
- *
- * @param stepMeta
- * @param stepDataInterface
- * @param copyNr
- * @param transMeta
- * @param trans
- */
- public SortKeyStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr,
- TransMeta transMeta, Trans trans) {
- super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
- }
-
- /**
- * Perform the equivalent of processing one row. Typically this means
- * reading a row from input (getRow()) and passing a row to output
- * (putRow)).
- *
- * @param smi The steps metadata to work with
- * @param sdi The steps temporary working data to work with (database
- * connections, result sets, caches, temporary variables, etc.)
- * @return false if no more rows can be processed or an error occurred.
- * @throws KettleException
- */
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
- // get step meta
- this.meta = ((SortKeyStepMeta) smi);
- StandardLogService.setThreadName(meta.getPartitionID(), null);
- // get step data
- this.data = ((SortKeyStepData) sdi);
-
- // get row
- Object[] row = getRow();
-
- // create sort observer
- this.observer = new SortObserver();
-
- // if row is null then this step can start processing the data
- if (row == null) {
- return processRowToNextStep();
- }
-
- // check if all records are null than send empty row to next step
- else if (NonDictionaryUtil.checkAllValuesForNull(row)) {
- // create empty row out size
- int outSize = Integer.parseInt(meta.getOutputRowSize());
-
- Object[] outRow = new Object[outSize];
-
- // clone out row meta
- this.data.setOutputRowMeta((RowMetaInterface) getInputRowMeta().clone());
-
- // get all fields
- this.meta.getFields(data.getOutputRowMeta(), getStepname(), null, null, this);
-
- LOGGER.info("Record Procerssed For table: " + meta.getTabelName());
- LOGGER.info("Record Form Previous Step was null");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 1 + ": Write: " + 1;
- LOGGER.info(logMessage);
-
- putRow(data.getOutputRowMeta(), outRow);
- setOutputDone();
- return false;
- }
-
- // if first
- if (first) {
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordSortRowsStepTotalTime(
- meta.getPartitionID(), System.currentTimeMillis());
- first = false;
-
- // clone out row meta
- this.data.setOutputRowMeta((RowMetaInterface) getInputRowMeta().clone());
-
- // get all fields
- this.meta.getFields(data.getOutputRowMeta(), getStepname(), null, null, this);
-
- this.meta.setNoDictionaryCount(
- NonDictionaryUtil.extractNoDictionaryCount(meta.getNoDictionaryDims()));
-
- this.noDictionaryColMaping =
- NonDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
- SortParameters parameters =
- SortParameters.createSortParameters(meta.getDatabaseName(), meta.getTabelName(),
- meta.getDimensionCount(), meta.getComplexDimensionCount(), meta.getMeasureCount(),
- meta.getNoDictionaryCount(), meta.getPartitionID(),
- meta.getSegmentId() + "", meta.getTaskNo(), this.noDictionaryColMaping);
- intermediateFileMerger = new SortIntermediateFileMerger(parameters);
- this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
- try {
- // initialize sort
- this.sortDataRows.initialize();
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new KettleException(e);
- }
-
- this.logCounter = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.DATA_LOAD_LOG_COUNTER,
- CarbonCommonConstants.DATA_LOAD_LOG_COUNTER_DEFAULT_COUNTER));
- }
-
- readCounter++;
- if (readCounter % logCounter == 0) {
- LOGGER.info("Record Procerssed For table: " + meta.getTabelName());
- String logMessage = "Carbon Sort Key Step: Record Read: " + readCounter;
- LOGGER.info(logMessage);
- }
-
- try {
- // add row
- this.sortDataRows.addRow(row);
- writeCounter++;
- } catch (Throwable e) {
- LOGGER.error(e);
- throw new KettleException(e);
- }
-
- return true;
- }
-
- /**
- * Below method will be used to process data to next step
- *
- * @return false is finished
- * @throws KettleException
- */
- private boolean processRowToNextStep() throws KettleException {
- if (null == this.sortDataRows) {
- LOGGER.info("Record Processed For table: " + meta.getTabelName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- putRow(data.getOutputRowMeta(), new Object[0]);
- setOutputDone();
- return false;
- }
-
- try {
- // start sorting
- this.sortDataRows.startSorting();
- this.intermediateFileMerger.finish();
-
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + meta.getTabelName());
- String logMessage =
- "Summary: Carbon Sort Key Step: Read: " + readCounter + ": Write: " + writeCounter;
- LOGGER.info(logMessage);
- putRow(data.getOutputRowMeta(), new Object[0]);
- setOutputDone();
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordSortRowsStepTotalTime(
- meta.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValuesTotalTime(
- meta.getPartitionID(), System.currentTimeMillis());
- return false;
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new KettleException(e);
- }
-
- }
-
- /**
- * Initialize and do work where other steps need to wait for...
- *
- * @param smi The metadata to work with
- * @param sdi The data to initialize
- * @return step initialize or not
- */
- public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
- this.meta = ((SortKeyStepMeta) smi);
- this.data = ((SortKeyStepData) sdi);
- return super.init(smi, sdi);
- }
-
- /**
- * Dispose of this step: close files, empty logs, etc.
- *
- * @param smi The metadata to work with
- * @param sdi The data to dispose of
- */
- public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
- this.meta = ((SortKeyStepMeta) smi);
- this.data = ((SortKeyStepData) sdi);
- this.sortDataRows = null;
- super.dispose(smi, sdi);
- this.meta = null;
- this.data = null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepData.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepData.java
deleted file mode 100644
index 1a9859f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepData.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdatastep;
-
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.trans.step.BaseStepData;
-import org.pentaho.di.trans.step.StepDataInterface;
-
-public class SortKeyStepData extends BaseStepData implements StepDataInterface {
-
- /**
- * outputRowMeta
- */
- private RowMetaInterface outputRowMeta;
-
- /**
- * rowMeta
- */
- private RowMetaInterface rowMeta;
-
- public RowMetaInterface getOutputRowMeta() {
- return outputRowMeta;
- }
-
- public void setOutputRowMeta(RowMetaInterface outputRowMeta) {
- this.outputRowMeta = outputRowMeta;
- }
-
- public RowMetaInterface getRowMeta() {
- return rowMeta;
- }
-
- public void setRowMeta(RowMetaInterface rowMeta) {
- this.rowMeta = rowMeta;
- }
-}
\ No newline at end of file