You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/30 05:12:13 UTC
[05/13] incubator-carbondata git commit: Removed kettle related code
and refactored
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
deleted file mode 100644
index 2b54812..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdatastep;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.pentaho.di.core.CheckResultInterface;
-import org.pentaho.di.core.Counter;
-import org.pentaho.di.core.database.DatabaseMeta;
-import org.pentaho.di.core.exception.KettleException;
-import org.pentaho.di.core.exception.KettleXMLException;
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.core.xml.XMLHandler;
-import org.pentaho.di.i18n.BaseMessages;
-import org.pentaho.di.repository.ObjectId;
-import org.pentaho.di.repository.Repository;
-import org.pentaho.di.trans.Trans;
-import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.BaseStepMeta;
-import org.pentaho.di.trans.step.StepDataInterface;
-import org.pentaho.di.trans.step.StepInterface;
-import org.pentaho.di.trans.step.StepMeta;
-import org.pentaho.di.trans.step.StepMetaInterface;
-import org.w3c.dom.Node;
-
-public class SortKeyStepMeta extends BaseStepMeta implements StepMetaInterface {
- /**
- * PKG
- */
- private static final Class<?> PKG = SortKeyStepMeta.class;
-
- /**
- * tabelName
- */
- private String tabelName;
-
- /**
- * outputRowSize
- */
- private String outputRowSize;
-
- /**
- * tableName
- */
- private String tableName;
-
- /**
- * databaseName
- */
- private String databaseName;
-
- /**
- * Dimension Count
- */
- private String dimensionCount;
-
- /**
- * ComplexTypes Count
- */
- private String complexDimensionCount;
-
- /**
- * Dimension Count
- */
- private int noDictionaryCount;
-
- /**
- * measureCount
- */
- private String measureCount;
-
- private String factDimLensString;
-
- /**
- * isUpdateMemberRequest
- */
- private String updateMemberRequest;
-
- private String measureDataType;
-
- private String noDictionaryDims;
- /**
- * partitionID
- */
- private String partitionID;
- /**
- * Id of the load folder
- */
- private String segmentId;
- /**
- * task id, each spark task has a unique id
- */
- private String taskNo;
- /**
- * To determine the column whether is dictionary or not.
- */
- private String noDictionaryDimsMapping;
-
- /**
- * set the default value for all the properties
- */
- @Override public void setDefault() {
- this.tabelName = "";
- factDimLensString = "";
- outputRowSize = "";
- databaseName = "";
- noDictionaryDims = "";
- noDictionaryDimsMapping = "";
- tableName = "";
- dimensionCount = "";
- complexDimensionCount = "";
- measureCount = "";
- updateMemberRequest = "";
- measureDataType = "";
- partitionID = "";
- segmentId = "";
- taskNo = "";
- }
-
- /**
- * Get the XML that represents the values in this step
- *
- * @return the XML that represents the metadata in this step
- * @throws KettleException in case there is a conversion or XML encoding error
- */
- public String getXML() {
- StringBuilder retval = new StringBuilder(150);
- retval.append(" ").append(XMLHandler.addTagValue("TableName", this.tabelName));
- retval.append(" ").append(XMLHandler.addTagValue("factDimLensString", factDimLensString));
- retval.append(" ").append(XMLHandler.addTagValue("outputRowSize", this.outputRowSize));
- retval.append(" ").append(XMLHandler.addTagValue("tableName", this.tableName));
- retval.append(" ").append(XMLHandler.addTagValue("databaseName", this.databaseName));
- retval.append(" ").append(XMLHandler.addTagValue("dimensionCount", this.dimensionCount));
- retval.append(" ").append(XMLHandler.addTagValue("noDictionaryDims", this.noDictionaryDims));
- retval.append(" ")
- .append(XMLHandler.addTagValue("noDictionaryDimsMapping", this.noDictionaryDimsMapping));
- retval.append(" ")
- .append(XMLHandler.addTagValue("complexDimensionCount", this.complexDimensionCount));
- retval.append(" ").append(XMLHandler.addTagValue("measureCount", this.measureCount));
- retval.append(" ")
- .append(XMLHandler.addTagValue("isUpdateMemberRequest", this.updateMemberRequest));
- retval.append(" ").append(XMLHandler.addTagValue("measureDataType", measureDataType));
- retval.append(" ").append(XMLHandler.addTagValue("partitionID", partitionID));
- retval.append(" ").append(XMLHandler.addTagValue("segmentId", segmentId));
- retval.append(" ").append(XMLHandler.addTagValue("taskNo", taskNo));
- return retval.toString();
- }
-
- /**
- * Load the values for this step from an XML Node
- *
- * @param stepnode the Node to get the info from
- * @param databases The available list of databases to reference to
- * @param counters Counters to reference.
- * @throws KettleXMLException When an unexpected XML error occurred. (malformed etc.)
- */
- public void loadXML(Node stepnode, List<DatabaseMeta> databases, Map<String, Counter> counters)
- throws KettleXMLException {
- try {
- this.tabelName = XMLHandler.getTagValue(stepnode, "TableName");
- this.outputRowSize = XMLHandler.getTagValue(stepnode, "outputRowSize");
- this.factDimLensString = XMLHandler.getTagValue(stepnode, "factDimLensString");
- this.tableName = XMLHandler.getTagValue(stepnode, "tableName");
- this.databaseName = XMLHandler.getTagValue(stepnode, "databaseName");
- this.dimensionCount = XMLHandler.getTagValue(stepnode, "dimensionCount");
- this.noDictionaryDims = XMLHandler.getTagValue(stepnode, "noDictionaryDims");
- this.noDictionaryDimsMapping = XMLHandler.getTagValue(stepnode, "noDictionaryDimsMapping");
- this.complexDimensionCount = XMLHandler.getTagValue(stepnode, "complexDimensionCount");
- this.measureCount = XMLHandler.getTagValue(stepnode, "measureCount");
- this.updateMemberRequest = XMLHandler.getTagValue(stepnode, "isUpdateMemberRequest");
- this.measureDataType = XMLHandler.getTagValue(stepnode, "measureDataType");
- this.partitionID = XMLHandler.getTagValue(stepnode, "partitionID");
- this.segmentId = XMLHandler.getTagValue(stepnode, "segmentId");
- this.taskNo = XMLHandler.getTagValue(stepnode, "taskNo");
- } catch (Exception e) {
- throw new KettleXMLException("Unable to read step info from XML node", e);
- }
- }
-
- /**
- * Save the steps data into a Kettle repository
- *
- * @param rep The Kettle repository to save to
- * @param idTransformation The transformation ID
- * @param idStep The step ID
- * @throws KettleException When an unexpected error occurred (database, network, etc)
- */
- public void saveRep(Repository rep, ObjectId idTransformation, ObjectId idStep)
- throws KettleException {
- try {
- rep.saveStepAttribute(idTransformation, idStep, "TableName", this.tabelName);
-
- rep.saveStepAttribute(idTransformation, idStep, "factDimLensString", factDimLensString);
- rep.saveStepAttribute(idTransformation, idStep, "outputRowSize", this.outputRowSize);
- rep.saveStepAttribute(idTransformation, idStep, "tableName", this.tableName);
- rep.saveStepAttribute(idTransformation, idStep, "databaseName", this.databaseName);
- rep.saveStepAttribute(idTransformation, idStep, "dimensionCount", this.dimensionCount);
- rep.saveStepAttribute(idTransformation, idStep, "noDictionaryDims", this.noDictionaryDims);
- rep.saveStepAttribute(idTransformation, idStep, "noDictionaryDimsMapping",
- this.noDictionaryDimsMapping);
- rep.saveStepAttribute(idTransformation, idStep, "complexDimensionCount",
- this.complexDimensionCount);
- rep.saveStepAttribute(idTransformation, idStep, "measureCount", this.measureCount);
- rep.saveStepAttribute(idTransformation, idStep, "isUpdateMemberRequest",
- this.updateMemberRequest);
- rep.saveStepAttribute(idTransformation, idStep, "measureDataType", measureDataType);
- rep.saveStepAttribute(idTransformation, idStep, "partitionID", partitionID);
- rep.saveStepAttribute(idTransformation, idStep, "segmentId", segmentId);
- rep.saveStepAttribute(idTransformation, idStep, "taskNo", taskNo);
- } catch (Exception e) {
- throw new KettleException(BaseMessages
- .getString(PKG, "TemplateStep.Exception.UnableToSaveStepInfoToRepository", new String[0])
- + idStep, e);
- }
- }
-
- /**
- * Read the steps information from a Kettle repository
- *
- * @param rep The repository to read from
- * @param idStep The step ID
- * @param databases The databases to reference
- * @param counters The counters to reference
- * @throws KettleException When an unexpected error occurred (database, network, etc)
- */
- public void readRep(Repository rep, ObjectId idStep, List<DatabaseMeta> databases,
- Map<String, Counter> counters) throws KettleException {
- try {
- this.tabelName = rep.getStepAttributeString(idStep, "TableName");
- this.outputRowSize = rep.getStepAttributeString(idStep, "outputRowSize");
- this.databaseName = rep.getStepAttributeString(idStep, "databaseName");
- this.tableName = rep.getStepAttributeString(idStep, "tableName");
- this.dimensionCount = rep.getStepAttributeString(idStep, "dimensionCount");
- this.noDictionaryDims = rep.getStepAttributeString(idStep, "noDictionaryDims");
- this.noDictionaryDims = rep.getStepAttributeString(idStep, "noDictionaryDimsMapping");
- this.complexDimensionCount = rep.getStepAttributeString(idStep, "complexDimensionCount");
- this.measureCount = rep.getStepAttributeString(idStep, "measureCount");
- this.updateMemberRequest = rep.getStepAttributeString(idStep, "isUpdateMemberRequest");
- this.measureDataType = rep.getStepAttributeString(idStep, "measureDataType");
- this.partitionID = rep.getStepAttributeString(idStep, "partitionID");
- this.segmentId = rep.getStepAttributeString(idStep, "segmentId");
- this.taskNo = rep.getStepAttributeString(idStep, "taskNo");
- } catch (Exception ex) {
- throw new KettleException(BaseMessages
- .getString(PKG, "CarbonDataWriterStepMeta.Exception.UnexpectedErrorInReadingStepInfo",
- new String[0]), ex);
- }
- }
-
- /**
- * Checks the settings of this step and puts the findings in a remarks List.
- *
- * @param remarks The list to put the remarks in @see
- * org.pentaho.di.core.CheckResult
- * @param stepMeta The stepMeta to help checking
- * @param prev The fields coming from the previous step
- * @param input The input step names
- * @param output The output step names
- * @param info The fields that are used as information by the step
- */
- public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta,
- RowMetaInterface prev, String[] input, String[] output, RowMetaInterface info) {
- CarbonDataProcessorUtil.checkResult(remarks, stepMeta, input);
- }
-
- /**
- * Get the executing step, needed by Trans to launch a step.
- *
- * @param stepMeta The step info
- * @param stepDataInterface the step data interface linked to this step. Here the step can
- * store temporary data, database connections, etc.
- * @param copyNr The copy nr to get
- * @param transMeta The transformation info
- * @param trans The launching transformation
- */
- public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr,
- TransMeta transMeta, Trans trans) {
- return new SortKeyStep(stepMeta, stepDataInterface, copyNr, transMeta, trans);
- }
-
- /**
- * Get a new instance of the appropriate data class. This data class
- * implements the StepDataInterface. It basically contains the persisting
- * data that needs to live on, even if a worker thread is terminated.
- *
- * @return The appropriate StepDataInterface class.
- */
- public StepDataInterface getStepData() {
- return new SortKeyStepData();
- }
-
- /**
- * Below method will be used to get the out row size
- *
- * @return outputRowSize
- */
- public String getOutputRowSize() {
- return outputRowSize;
- }
-
- /**
- * below mthod will be used to set the out row size
- *
- * @param outputRowSize
- */
- public void setOutputRowSize(String outputRowSize) {
- this.outputRowSize = outputRowSize;
- }
-
- /**
- * This method will return the table name
- *
- * @return tabelName
- */
-
- public String getTabelName() {
- return this.tabelName;
- }
-
- /**
- * This method will set the table name
- *
- * @param tabelName
- */
- public void setTabelName(String tabelName) {
- this.tabelName = tabelName;
- }
-
- /**
- * @param tableName the tableName to set
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @return the databaseName
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * @param databaseName the databaseName to set
- */
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- /**
- * @return the dimensionCount
- */
- public int getDimensionCount() {
- return Integer.parseInt(dimensionCount);
- }
-
- public void setDimensionCount(String dimensionCount) {
- this.dimensionCount = dimensionCount;
- }
-
- /**
- * @return the complexDimensionCount
- */
- public int getComplexDimensionCount() {
- return Integer.parseInt(complexDimensionCount);
- }
-
- public void setComplexDimensionCount(String complexDimensionCount) {
- this.complexDimensionCount = complexDimensionCount;
- }
-
- /**
- * @return the measureCount
- */
- public int getMeasureCount() {
- return Integer.parseInt(measureCount);
- }
-
- /**
- * @param measureCount the measureCount to set
- */
- public void setMeasureCount(String measureCount) {
- this.measureCount = measureCount;
- }
-
- /**
- * @param isUpdateMemberRequest the isUpdateMemberRequest to set
- */
- public void setIsUpdateMemberRequest(String isUpdateMemberRequest) {
- this.updateMemberRequest = isUpdateMemberRequest;
- }
-
- public void setMeasureDataType(String measureDataType) {
- this.measureDataType = measureDataType;
- }
-
- public String getNoDictionaryDims() {
- return noDictionaryDims;
- }
-
- public void setNoDictionaryDims(String noDictionaryDims) {
- this.noDictionaryDims = noDictionaryDims;
- }
-
- /**
- * @return the noDictionaryCount
- */
- public int getNoDictionaryCount() {
- return noDictionaryCount;
- }
-
- /**
- * @param noDictionaryCount the noDictionaryCount to set
- */
- public void setNoDictionaryCount(int noDictionaryCount) {
- this.noDictionaryCount = noDictionaryCount;
- }
-
- /**
- * @return partitionId
- */
- public String getPartitionID() {
- return partitionID;
- }
-
- /**
- * @param partitionID
- */
- public void setPartitionID(String partitionID) {
- this.partitionID = partitionID;
- }
-
- /**
- * return segmentId
- *
- * @return
- */
- public String getSegmentId() {
- return segmentId;
- }
-
- /**
- * set segment Id
- *
- * @param segmentId
- */
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- /**
- * @param taskNo
- */
- public void setTaskNo(String taskNo) {
- this.taskNo = taskNo;
- }
-
- /**
- * @return
- */
- public String getTaskNo() {
- return taskNo;
- }
-
- public String getNoDictionaryDimsMapping() {
- return noDictionaryDimsMapping;
- }
-
- public void setNoDictionaryDimsMapping(String noDictionaryDimsMapping) {
- this.noDictionaryDimsMapping = noDictionaryDimsMapping;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 3806c55..2affa03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -65,13 +65,13 @@ import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.util.ValueCompressionUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.mdkeygen.file.FileManager;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
import org.apache.carbondata.processing.store.colgroup.DataHolder;
+import org.apache.carbondata.processing.store.file.FileManager;
+import org.apache.carbondata.processing.store.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
@@ -256,8 +256,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private boolean compactionFlow;
- private boolean useKettle;
-
private int bucketNumber;
private long schemaUpdatedTimeStamp;
@@ -279,7 +277,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.dimensionCount = carbonFactDataHandlerModel.getDimensionCount();
this.complexIndexMap = carbonFactDataHandlerModel.getComplexIndexMap();
this.primitiveDimLens = carbonFactDataHandlerModel.getPrimitiveDimLens();
- this.useKettle = carbonFactDataHandlerModel.isUseKettle();
this.isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
@@ -481,144 +478,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
}
- // TODO remove after kettle flow is removed
- private NodeHolder processDataRows(List<Object[]> dataRows) throws CarbonDataWriterException {
- Object[] max = new Object[measureCount];
- Object[] min = new Object[measureCount];
- int[] decimal = new int[measureCount];
- Object[] uniqueValue = new Object[measureCount];
- // to store index of the measure columns which are null
- BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
- for (int i = 0; i < max.length; i++) {
- if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
- max[i] = Long.MIN_VALUE;
- } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
- max[i] = -Double.MAX_VALUE;
- } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- max[i] = new BigDecimal(-Double.MAX_VALUE);
- } else {
- max[i] = 0.0;
- }
- }
- for (int i = 0; i < min.length; i++) {
- if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
- min[i] = Long.MAX_VALUE;
- uniqueValue[i] = Long.MIN_VALUE;
- } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
- min[i] = Double.MAX_VALUE;
- uniqueValue[i] = Double.MIN_VALUE;
- } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- min[i] = new BigDecimal(Double.MAX_VALUE);
- uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
- } else {
- min[i] = 0.0;
- uniqueValue[i] = 0.0;
- }
- }
- for (int i = 0; i < decimal.length; i++) {
- decimal[i] = 0;
- }
-
- byte[] startKey = null;
- byte[] endKey = null;
- byte[] noDictStartKey = null;
- byte[] noDictEndKey = null;
- CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size());
- CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolder(dataRows.size());
- CarbonWriteDataHolder noDictionaryKeyDataHolder = null;
- if ((noDictionaryCount + complexColCount) > 0) {
- noDictionaryKeyDataHolder = initialiseKeyBlockHolder(dataRows.size());
- }
-
- for (int count = 0; count < dataRows.size(); count++) {
- Object[] row = dataRows.get(count);
- byte[] mdKey = (byte[]) row[this.mdKeyIndex];
- byte[] noDictionaryKey = null;
- if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
- noDictionaryKey = (byte[]) row[this.mdKeyIndex - 1];
- }
- ByteBuffer byteBuffer = null;
- byte[] b = null;
- if (count == 0) {
- startKey = mdKey;
- noDictStartKey = noDictionaryKey;
- }
- endKey = mdKey;
- noDictEndKey = noDictionaryKey;
- // add to key store
- if (mdKey.length > 0) {
- keyDataHolder.setWritableByteArrayValueByIndex(count, mdKey);
- }
- // for storing the byte [] for high card.
- if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
- noDictionaryKeyDataHolder.setWritableByteArrayValueByIndex(count, noDictionaryKey);
- }
- //Add all columns to keyDataHolder
- keyDataHolder.setWritableByteArrayValueByIndex(count, this.mdKeyIndex, row);
- // CHECKSTYLE:OFF Approval No:Approval-351
- for (int k = 0; k < otherMeasureIndex.length; k++) {
- if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
- if (null == row[otherMeasureIndex[k]]) {
- nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
- dataHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
- } else {
- dataHolder[otherMeasureIndex[k]]
- .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
- }
- } else {
- if (null == row[otherMeasureIndex[k]]) {
- nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
- dataHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
- } else {
- dataHolder[otherMeasureIndex[k]]
- .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
- }
- }
- }
- calculateMaxMin(max, min, decimal, otherMeasureIndex, row);
- for (int i = 0; i < customMeasureIndex.length; i++) {
- if (null == row[customMeasureIndex[i]]
- && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- BigDecimal val = BigDecimal.valueOf(0);
- b = DataTypeUtil.bigDecimalToByte(val);
- nullValueIndexBitSet[customMeasureIndex[i]].set(count);
- } else {
- if (this.compactionFlow) {
- BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
- b = DataTypeUtil.bigDecimalToByte(bigDecimal);
- } else {
- b = (byte[]) row[customMeasureIndex[i]];
- }
- }
- byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
- byteBuffer.putInt(b.length);
- byteBuffer.put(b);
- byteBuffer.flip();
- b = byteBuffer.array();
- dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
- }
- calculateMaxMin(max, min, decimal, customMeasureIndex, row);
- }
- calculateUniqueValue(min, uniqueValue);
- byte[][] byteArrayValues = keyDataHolder.getByteArrayValues().clone();
- byte[][] noDictionaryValueHolder = null;
- if ((noDictionaryCount + complexColCount) > 0) {
- noDictionaryValueHolder = noDictionaryKeyDataHolder.getByteArrayValues();
- }
- WriterCompressModel compressionModel = ValueCompressionUtil
- .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[max.length]);
- byte[][] writableMeasureDataArray =
- StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder)
- .clone();
- NodeHolder nodeHolder =
- getNodeHolderObject(writableMeasureDataArray, byteArrayValues, dataRows.size(), startKey,
- endKey, compressionModel, noDictionaryValueHolder, noDictStartKey, noDictEndKey,
- nullValueIndexBitSet);
- LOGGER.info("Number Of records processed: " + dataRows.size());
- return nodeHolder;
- }
-
- private NodeHolder processDataRowsWithOutKettle(List<Object[]> dataRows)
+ private NodeHolder processDataRows(List<Object[]> dataRows)
throws CarbonDataWriterException {
Object[] max = new Object[measureCount];
Object[] min = new Object[measureCount];
@@ -661,7 +521,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
byte[][] noDictStartKey = null;
byte[][] noDictEndKey = null;
CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size());
- CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolderWithOutKettle(dataRows.size());
+ CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolder(dataRows.size());
CarbonWriteDataHolder noDictionaryKeyDataHolder = null;
if ((noDictionaryCount + complexColCount) > 0) {
noDictionaryKeyDataHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
@@ -756,142 +616,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder)
.clone();
NodeHolder nodeHolder =
- getNodeHolderObjectWithOutKettle(writableMeasureDataArray, byteArrayValues, dataRows.size(),
+ getNodeHolderObject(writableMeasureDataArray, byteArrayValues, dataRows.size(),
startKey, endKey, compressionModel, noDictionaryValueHolder, noDictStartKey,
noDictEndKey, nullValueIndexBitSet);
LOGGER.info("Number Of records processed: " + dataRows.size());
return nodeHolder;
}
- // TODO remove after kettle flow is removed
- private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues,
- int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
- WriterCompressModel compressionModel, byte[][] noDictionaryData, byte[] noDictionaryStartKey,
- byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException {
- byte[][][] noDictionaryColumnsData = null;
- List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
- int complexColCount = getComplexColsCount();
-
- for (int i = 0; i < complexColCount; i++) {
- colsAndValues.add(new ArrayList<byte[]>());
- }
- int noOfColumn = colGrpModel.getNoOfColumnStore();
- DataHolder[] dataHolders = getDataHolders(noOfColumn, byteArrayValues.length);
- for (int i = 0; i < byteArrayValues.length; i++) {
- byte[][] splitKey = columnarSplitter.splitKey(byteArrayValues[i]);
-
- for (int j = 0; j < splitKey.length; j++) {
- dataHolders[j].addData(splitKey[j], i);
- }
- }
- if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
- noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryData.length][];
- for (int i = 0; i < noDictionaryData.length; i++) {
- int complexColumnIndex = primitiveDimLens.length + noDictionaryCount;
- byte[][] splitKey = NonDictionaryUtil
- .splitNoDictionaryKey(noDictionaryData[i], noDictionaryCount + complexIndexMap.size());
-
- int complexTypeIndex = 0;
- for (int j = 0; j < splitKey.length; j++) {
- //nodictionary Columns
- if (j < noDictionaryCount) {
- noDictionaryColumnsData[j][i] = splitKey[j];
- }
- //complex types
- else {
- // Need to write columnar block from complex byte array
- int index = complexColumnIndex - noDictionaryCount;
- GenericDataType complexDataType = complexIndexMap.get(index);
- complexColumnIndex++;
- if (complexDataType != null) {
- List<ArrayList<byte[]>> columnsArray = new ArrayList<ArrayList<byte[]>>();
- for (int k = 0; k < complexDataType.getColsCount(); k++) {
- columnsArray.add(new ArrayList<byte[]>());
- }
-
- try {
- ByteBuffer complexDataWithoutBitPacking = ByteBuffer.wrap(splitKey[j]);
- byte[] complexTypeData = new byte[complexDataWithoutBitPacking.getShort()];
- complexDataWithoutBitPacking.get(complexTypeData);
-
- ByteBuffer byteArrayInput = ByteBuffer.wrap(complexTypeData);
- ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
- complexDataType
- .parseAndBitPack(byteArrayInput, dataOutputStream, this.complexKeyGenerator);
- complexDataType.getColumnarDataForComplexType(columnsArray,
- ByteBuffer.wrap(byteArrayOutput.toByteArray()));
- byteArrayOutput.close();
- } catch (IOException e) {
- throw new CarbonDataWriterException(
- "Problem while bit packing and writing complex datatype", e);
- } catch (KeyGenException e) {
- throw new CarbonDataWriterException(
- "Problem while bit packing and writing complex datatype", e);
- }
-
- for (ArrayList<byte[]> eachColumn : columnsArray) {
- colsAndValues.get(complexTypeIndex++).addAll(eachColumn);
- }
- } else {
- // This case not possible as ComplexType is the last columns
- }
- }
- }
- }
- }
- thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
- CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
- ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
- List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
- primitiveDimLens.length + noDictionaryCount + complexColCount);
- int i = 0;
- int dictionaryColumnCount = -1;
- int noDictionaryColumnCount = -1;
- for (i = 0; i < dimensionType.length; i++) {
- if (dimensionType[i]) {
- dictionaryColumnCount++;
- if (colGrpModel.isColumnar(dictionaryColumnCount)) {
- submit.add(executorService.submit(
- new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
- isUseInvertedIndex[i])));
- } else {
- submit.add(
- executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
- }
- } else {
- submit.add(executorService.submit(
- new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true,
- true, isUseInvertedIndex[i])));
- }
- }
- for (int k = 0; k < complexColCount; k++) {
- submit.add(executorService.submit(new BlockSortThread(i++,
- colsAndValues.get(k).toArray(new byte[colsAndValues.get(k).size()][]), false, true)));
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(1, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- LOGGER.error(e, e.getMessage());
- }
- IndexStorage[] blockStorage =
- new IndexStorage[colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount];
- try {
- for (int k = 0; k < blockStorage.length; k++) {
- blockStorage[k] = submit.get(k).get();
- }
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- return this.dataWriter
- .buildDataNodeHolder(blockStorage, dataHolderLocal, entryCountLocal, startkeyLocal,
- endKeyLocal, compressionModel, noDictionaryStartKey, noDictionaryEndKey,
- nullValueIndexBitSet);
- }
-
- private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal,
+ private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal,
byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
WriterCompressModel compressionModel, byte[][][] noDictionaryData,
byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
@@ -1343,16 +1075,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
return blockKeySizeWithComplexTypes;
}
- // TODO Remove after kettle flow got removed.
private CarbonWriteDataHolder initialiseKeyBlockHolder(int size) {
CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
- keyDataHolder.initialiseByteArrayValues(size);
- return keyDataHolder;
- }
-
- private CarbonWriteDataHolder initialiseKeyBlockHolderWithOutKettle(int size) {
- CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
- keyDataHolder.initialiseByteArrayValuesWithOutKettle(size);
+ keyDataHolder.initialiseByteArrayValuesForKey(size);
return keyDataHolder;
}
@@ -1556,12 +1281,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
@Override public Void call() throws Exception {
try {
- NodeHolder nodeHolder;
- if (useKettle) {
- nodeHolder = processDataRows(dataRows);
- } else {
- nodeHolder = processDataRowsWithOutKettle(dataRows);
- }
+ NodeHolder nodeHolder = processDataRows(dataRows);
// insert the object in array according to sequence number
int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
blockletDataHolder.put(nodeHolder, indexInNodeHolderArray);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 59f2eb3..ffd23a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -171,11 +171,6 @@ public class CarbonFactDataHandlerModel {
*/
private boolean isCompactionFlow;
- /**
- * To use kettle flow to load or not.
- */
- private boolean useKettle = true;
-
private int bucketId = 0;
private String segmentId;
@@ -287,7 +282,6 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndex);
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
- carbonFactDataHandlerModel.setUseKettle(false);
if (noDictionaryCount > 0 || complexDimensionCount > 0) {
carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
} else {
@@ -500,14 +494,6 @@ public class CarbonFactDataHandlerModel {
this.wrapperColumnSchema = wrapperColumnSchema;
}
- public boolean isUseKettle() {
- return useKettle;
- }
-
- public void setUseKettle(boolean useKettle) {
- this.useKettle = useKettle;
- }
-
public int getBucketId() {
return bucketId;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 8c94328..68f9bd5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -95,8 +95,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
private char[] aggType;
- private boolean useKettle;
-
/**
* below code is to check whether dimension
* is of no dictionary type or not
@@ -105,7 +103,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName,
int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
- char[] aggType, boolean[] isNoDictionaryColumn, boolean useKettle) {
+ char[] aggType, boolean[] isNoDictionaryColumn) {
this.tempFileLocation = tempFileLocation;
this.tableName = tableName;
this.dimensionCount = dimensionCount;
@@ -114,7 +112,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
this.aggType = aggType;
this.noDictionaryCount = noDictionaryCount;
this.isNoDictionaryColumn = isNoDictionaryColumn;
- this.useKettle = useKettle;
}
/**
@@ -183,8 +180,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
// create chunk holder
SortTempFileChunkHolder sortTempFileChunkHolder =
new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
- measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn,
- useKettle);
+ measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn);
// initialize
sortTempFileChunkHolder.initialize();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
new file mode 100644
index 0000000..ddd9bf2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.file;
+
+
+public class FileData extends FileManager {
+
+ /**
+ * Store Path
+ */
+ private String storePath;
+
+ /**
+ * hierarchyValueWriter
+ */
+
+ public FileData(String fileName, String storePath) {
+ this.fileName = fileName;
+ this.storePath = storePath;
+ }
+
+ /**
+ * @return Returns the carbonDataFileTempPath.
+ */
+ public String getFileName() {
+ return fileName;
+ }
+
+ /**
+ * @return Returns the storePath.
+ */
+ public String getStorePath() {
+ return storePath;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
new file mode 100644
index 0000000..cfa3a66
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.file;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public class FileManager implements IFileManagerComposite {
+ /**
+ * listOfFileData, composite parent which holds the different objects
+ */
+ protected List<IFileManagerComposite> listOfFileData =
+ new ArrayList<IFileManagerComposite>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+ protected String fileName;
+
+ @Override public void add(IFileManagerComposite customData) {
+ listOfFileData.add(customData);
+ }
+
+ @Override public void remove(IFileManagerComposite customData) {
+ listOfFileData.remove(customData);
+
+ }
+
+ @Override public IFileManagerComposite get(int i) {
+ return listOfFileData.get(i);
+ }
+
+ @Override public void setName(String name) {
+ this.fileName = name;
+ }
+
+ /**
+ * Return the size
+ */
+ public int size() {
+ return listOfFileData.size();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
new file mode 100644
index 0000000..6691772
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.file;
+
+public interface IFileManagerComposite {
+ /**
+ * Add the data which can be either row Folder(Composite) or File
+ *
+ * @param customData
+ */
+ void add(IFileManagerComposite customData);
+
+ /**
+ * Remove the CustomData type object from the IFileManagerComposite object hierarchy.
+ *
+ * @param customData
+ */
+ void remove(IFileManagerComposite customData);
+
+ /**
+ * get the CustomData type object name
+ *
+ * @return CustomDataIntf type
+ */
+ IFileManagerComposite get(int i);
+
+ /**
+ * set the CustomData type object name
+ *
+ * @param name
+ */
+ void setName(String name);
+
+ /**
+ * Get the size
+ *
+ * @return
+ */
+ int size();
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a73b356..cda907c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -66,7 +66,7 @@ import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
-import org.apache.carbondata.processing.mdkeygen.file.FileData;
+import org.apache.carbondata.processing.store.file.FileData;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.commons.lang3.ArrayUtils;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 9ed0baa..ce53ec8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -20,8 +20,8 @@ import java.util.List;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.file.IFileManagerComposite;
/**
* Value object for writing the data
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
deleted file mode 100644
index 86b63df..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.writer.HierarchyValueWriterForCSV;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
-import org.apache.carbondata.processing.schema.metadata.ArrayWrapper;
-import org.apache.carbondata.processing.schema.metadata.ColumnSchemaDetails;
-import org.apache.carbondata.processing.schema.metadata.ColumnsInfo;
-
-import org.pentaho.di.core.exception.KettleException;
-
-public abstract class CarbonCSVBasedDimSurrogateKeyGen {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonCSVBasedDimSurrogateKeyGen.class.getName());
- /**
- * max
- */
- protected int[] max;
- /**
- * connection
- */
- protected Connection connection;
- /**
- * hierInsertFileNames
- */
- protected Map<String, String> hierInsertFileNames;
- /**
- * dimInsertFileNames
- */
- protected String[] dimInsertFileNames;
- /**
- * columnsInfo
- */
- protected ColumnsInfo columnsInfo;
- /**
- * primary key max surrogate key map
- */
- protected Map<String, Integer> primaryKeysMaxSurroagetMap;
- /**
- * Measure max surrogate key map
- */
- protected Map<String, Integer> measureMaxSurroagetMap;
- /**
- * File manager
- */
- protected IFileManagerComposite fileManager;
-
- /**
- * Cache should be map only. because, multiple levels can map to same
- * database column. This case duplicate storage should be avoided.
- */
- private Map<String, Dictionary> dictionaryCaches;
- /**
- * Year Cache
- */
- private Map<String, Map<String, Integer>> timeDimCache;
- /**
- * dimsFiles
- */
- private String[] dimsFiles;
- /**
- * timeDimMax
- */
- private int[] timDimMax;
- /**
- * hierCache
- */
- private Map<String, Map<Integer, int[]>> hierCache =
- new HashMap<String, Map<Integer, int[]>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- /**
- *
- */
- private Map<String, Map<ArrayWrapper, Integer>> hierCacheReverse =
- new HashMap<String, Map<ArrayWrapper, Integer>>(
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- /**
- * dimension ordinal to dimension mapping
- */
- private CarbonDimension[] dimensionOrdinalToDimensionMapping;
- /**
- * rwLock2
- */
- private ReentrantReadWriteLock rwLock2 = new ReentrantReadWriteLock();
- /**
- * wLock2
- */
- protected Lock wLock2 = rwLock2.writeLock();
- /**
- * Store Folder Name with Load number.
- */
- private String storeFolderWithLoadNumber;
-
- /**
- * @param columnsInfo ColumnsInfo With all the required details for surrogate key generation and
- * hierarchy entries.
- */
- public CarbonCSVBasedDimSurrogateKeyGen(ColumnsInfo columnsInfo) {
- this.columnsInfo = columnsInfo;
-
- setDimensionTables(columnsInfo.getDimColNames());
- setHierFileNames(columnsInfo.getHierTables());
- }
-
- /**
- * @param tuple The string value whose surrogate key will be gennerated.
- * @param tabColumnName The K of dictionaryCaches Map, for example "tablename_columnname"
- */
- public Integer generateSurrogateKeys(String tuple, String tabColumnName) throws KettleException {
- Integer key = null;
- Dictionary dicCache = dictionaryCaches.get(tabColumnName);
- key = dicCache.getSurrogateKey(tuple);
- return key;
- }
-
- /**
- * @param tuple The string value whose surrogate key will be gennerated.
- * @param tabColumnName The K of dictionaryCaches Map, for example "tablename_columnname"
- */
- public Integer generateSurrogateKeys(String tuple, String tabColumnName, String columnId)
- throws KettleException {
- Integer key = null;
- Dictionary dicCache = dictionaryCaches.get(tabColumnName);
- if (null == dicCache) {
- ColumnSchemaDetails columnSchemaDetails =
- this.columnsInfo.getColumnSchemaDetailsWrapper().get(columnId);
- if (columnSchemaDetails.isDirectDictionary()) {
- DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(columnSchemaDetails.getColumnType());
- key = directDictionaryGenerator.generateDirectSurrogateKey(tuple);
- }
- } else {
- key = dicCache.getSurrogateKey(tuple);
- }
- return key;
- }
-
-
- public Integer generateSurrogateKeysForTimeDims(String tuple, String columnName, int index,
- Object[] props) throws KettleException {
- Integer key = null;
- Dictionary dicCache = dictionaryCaches.get(columnName);
- key = dicCache.getSurrogateKey(tuple);
- if (key == null) {
- if (timDimMax[index] >= columnsInfo.getMaxKeys()[index]) {
- if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(tuple)) {
- tuple = null;
- }
- LOGGER.error("Invalid cardinality. Key size exceeded cardinality for: " + columnsInfo
- .getDimColNames()[index] + ": MemberValue: " + tuple);
- return -1;
- }
- timDimMax[index]++;
- Map<String, Integer> timeCache = timeDimCache.get(columnName);
- // Extract properties from tuple
- // Need to create a new surrogate key.
- key = getSurrogateFromStore(tuple, index, props);
- if (null != timeCache) {
- timeCache.put(tuple, key);
- }
- } else {
- return updateSurrogateToStore(tuple, columnName, index, key, props);
- }
- return key;
- }
-
- public void checkNormalizedHierExists(int[] val, String hier,
- HierarchyValueWriterForCSV hierWriter) throws KettleException {
- Map<ArrayWrapper, Integer> cache = hierCacheReverse.get(hier);
-
- ArrayWrapper wrapper = new ArrayWrapper(val);
- Integer hCache = cache.get(wrapper);
- if (hCache != null) {
- return;
- } else {
- wLock2.lock();
- try {
- getNormalizedHierFromStore(val, hier, 1, hierWriter);
- // Store in cache
- cache.put(wrapper, 1);
- } finally {
- wLock2.unlock();
- }
- }
- }
-
- public void close() throws Exception {
- if (null != connection) {
- connection.close();
- }
- }
-
- /**
- * Search entry and insert if not found in store.
- *
- * @param val
- * @param hier
- * @return
- * @throws KeyGenException
- * @throws KettleException
- */
- protected abstract byte[] getNormalizedHierFromStore(int[] val, String hier, int primaryKey,
- HierarchyValueWriterForCSV hierWriter) throws KettleException;
-
- /**
- * Search entry and insert if not found in store.
- *
- * @param value
- * @param index
- * @param properties - Ordinal column, name column and all other properties
- * @return
- * @throws KettleException
- */
- protected abstract int getSurrogateFromStore(String value, int index, Object[] properties)
- throws KettleException;
-
- /**
- * Search entry and insert if not found in store.
- *
- * @param value
- * @param columnName
- * @param index
- * @param properties - Ordinal column, name column and all other properties
- * @return
- * @throws KettleException
- */
- protected abstract int updateSurrogateToStore(String value, String columnName, int index, int key,
- Object[] properties) throws KettleException;
-
- /**
- * generate the surroagate key for the measure values.
- *
- * @return
- * @throws KettleException
- */
- public abstract int getSurrogateForMeasure(String tuple, String columnName)
- throws KettleException;
-
- private Map<Integer, int[]> getHCache(String hName) {
- Map<Integer, int[]> hCache = hierCache.get(hName);
- if (hCache == null) {
- hCache = new HashMap<Integer, int[]>();
- hierCache.put(hName, hCache);
- }
-
- return hCache;
- }
-
- private Map<ArrayWrapper, Integer> getHCacheReverse(String hName) {
- Map<ArrayWrapper, Integer> hCache = hierCacheReverse.get(hName);
- if (hCache == null) {
- hCache = new HashMap<ArrayWrapper, Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- hierCacheReverse.put(hName, hCache);
- }
-
- return hCache;
- }
-
- private void setHierFileNames(Set<String> set) {
- hierInsertFileNames =
- new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (String s : set) {
- hierInsertFileNames.put(s, s + CarbonCommonConstants.HIERARCHY_FILE_EXTENSION);
-
- // fix hierStream is null issue
- getHCache(s);
- getHCacheReverse(s);
- }
- }
-
- private void setDimensionTables(String[] dimeFileNames) {
- int noOfPrimitiveDims = 0;
- List<String> dimFilesForPrimitives = new ArrayList<String>();
- List<Boolean> isDirectDictionary = new ArrayList<Boolean>();
- dictionaryCaches = new ConcurrentHashMap<String, Dictionary>();
- for (int i = 0; i < dimeFileNames.length; i++) {
- GenericDataType complexType = columnsInfo.getComplexTypesMap()
- .get(dimeFileNames[i].substring(columnsInfo.getTableName().length() + 1));
- if (complexType != null) {
- List<GenericDataType> primitiveChild = new ArrayList<GenericDataType>();
- complexType.getAllPrimitiveChildren(primitiveChild);
- for (GenericDataType eachPrimitive : primitiveChild) {
- dimFilesForPrimitives.add(
- columnsInfo.getTableName() + CarbonCommonConstants.UNDERSCORE + eachPrimitive
- .getName());
- eachPrimitive.setSurrogateIndex(noOfPrimitiveDims);
- noOfPrimitiveDims++;
- ColumnSchemaDetails columnSchemaDetails =
- columnsInfo.getColumnSchemaDetailsWrapper().get(eachPrimitive.getColumnId());
- if (columnSchemaDetails.isDirectDictionary()) {
- isDirectDictionary.add(true);
- } else {
- isDirectDictionary.add(false);
- }
- }
- } else {
- dimFilesForPrimitives.add(dimeFileNames[i]);
- noOfPrimitiveDims++;
- isDirectDictionary.add(false);
- }
- }
- max = new int[noOfPrimitiveDims];
- for (int i = 0; i < isDirectDictionary.size(); i++) {
- if (isDirectDictionary.get(i)) {
- max[i] = Integer.MAX_VALUE;
- }
- }
- this.dimsFiles = dimFilesForPrimitives.toArray(new String[dimFilesForPrimitives.size()]);
-
- createRespectiveDimFilesForDimTables();
- }
-
- private void createRespectiveDimFilesForDimTables() {
- int dimCount = this.dimsFiles.length;
- dimInsertFileNames = new String[dimCount];
- System.arraycopy(dimsFiles, 0, dimInsertFileNames, 0, dimCount);
- }
-
- /**
- * isCacheFilled
- *
- * @param columnNames
- * @return boolean
- */
- public abstract boolean isCacheFilled(String[] columnNames);
-
- /**
- * @return Returns the storeFolderWithLoadNumber.
- */
- public String getStoreFolderWithLoadNumber() {
- return storeFolderWithLoadNumber;
- }
-
- /**
- * @param storeFolderWithLoadNumber The storeFolderWithLoadNumber to set.
- */
- public void setStoreFolderWithLoadNumber(String storeFolderWithLoadNumber) {
- this.storeFolderWithLoadNumber = storeFolderWithLoadNumber;
- }
-
- /**
- * @return Returns the dictionaryCaches.
- */
- public Map<String, Dictionary> getDictionaryCaches() {
- return dictionaryCaches;
- }
-
- /**
- * @param dictionaryCaches The dictionaryCaches to set.
- */
- public void setDictionaryCaches(Map<String, Dictionary> dictionaryCaches) {
- this.dictionaryCaches = dictionaryCaches;
- }
-
- /**
- * @return Returns the timeDimCache.
- */
- public Map<String, Map<String, Integer>> getTimeDimCache() {
- return timeDimCache;
- }
-
- /**
- * @param timeDimCache The timeDimCache to set.
- */
- public void setTimeDimCache(Map<String, Map<String, Integer>> timeDimCache) {
- this.timeDimCache = timeDimCache;
- }
-
- /**
- * @return Returns the dimsFiles.
- */
- public String[] getDimsFiles() {
- return dimsFiles;
- }
-
- /**
- * @param dimsFiles The dimsFiles to set.
- */
- public void setDimsFiles(String[] dimsFiles) {
- this.dimsFiles = dimsFiles;
- }
-
- /**
- * @return Returns the hierCache.
- */
- public Map<String, Map<Integer, int[]>> getHierCache() {
- return hierCache;
- }
-
- /**
- * @param hierCache The hierCache to set.
- */
- public void setHierCache(Map<String, Map<Integer, int[]>> hierCache) {
- this.hierCache = hierCache;
- }
-
- /**
- * @return Returns the timDimMax.
- */
- public int[] getTimDimMax() {
- return timDimMax;
- }
-
- /**
- * @param timDimMax The timDimMax to set.
- */
- public void setTimDimMax(int[] timDimMax) {
- this.timDimMax = timDimMax;
- }
-
- /**
- * @return the hierCacheReverse
- */
- public Map<String, Map<ArrayWrapper, Integer>> getHierCacheReverse() {
- return hierCacheReverse;
- }
-
- /**
- * @param hierCacheReverse the hierCacheReverse to set
- */
- public void setHierCacheReverse(Map<String, Map<ArrayWrapper, Integer>> hierCacheReverse) {
- this.hierCacheReverse = hierCacheReverse;
- }
-
- public int[] getMax() {
- return max;
- }
-
- public void setMax(int[] max) {
- this.max = max;
- }
-
- /**
- * @return the measureMaxSurroagetMap
- */
- public Map<String, Integer> getMeasureMaxSurroagetMap() {
- return measureMaxSurroagetMap;
- }
-
- /**
- * @param measureMaxSurroagetMap the measureMaxSurroagetMap to set
- */
- public void setMeasureMaxSurroagetMap(Map<String, Integer> measureMaxSurroagetMap) {
- this.measureMaxSurroagetMap = measureMaxSurroagetMap;
- }
-
- /**
- * @return
- */
- public CarbonDimension[] getDimensionOrdinalToDimensionMapping() {
- return dimensionOrdinalToDimensionMapping;
- }
-
- /**
- * @param dimensionOrdinalToDimensionMapping
- */
- public void setDimensionOrdinalToDimensionMapping(
- CarbonDimension[] dimensionOrdinalToDimensionMapping) {
- this.dimensionOrdinalToDimensionMapping = dimensionOrdinalToDimensionMapping;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java
deleted file mode 100644
index d78dc1a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenData.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-
-import org.pentaho.di.core.row.RowMetaInterface;
-import org.pentaho.di.trans.step.BaseStepData;
-import org.pentaho.di.trans.step.StepDataInterface;
-
-public class CarbonCSVBasedSeqGenData extends BaseStepData implements StepDataInterface {
-
- /**
- * outputRowMeta
- */
- private RowMetaInterface outputRowMeta;
-
- /**
- * surrogateKeyGen
- */
- private CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen;
-
- /**
- * keyGenerators
- */
- private Map<String, KeyGenerator> keyGenerators =
- new HashMap<String, KeyGenerator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- /**
- * columnIndex
- */
- private Map<String, int[]> columnIndex =
- new HashMap<String, int[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- /**
- * precomputed default objects
- */
- private Object[] defaultObjects;
-
- /**
- * generator
- */
- private KeyGenerator generator;
-
- /**
- * the size of the input rows
- */
- private int inputSize;
-
-
- public CarbonCSVBasedSeqGenData() {
- super();
- }
-
- /**
- * @return Returns the surrogateKeyGen.
- */
- public CarbonCSVBasedDimSurrogateKeyGen getSurrogateKeyGen() {
- return surrogateKeyGen;
- }
-
- /**
- * @param surrogateKeyGen The surrogateKeyGen to set.
- */
- public void setSurrogateKeyGen(CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen) {
- this.surrogateKeyGen = surrogateKeyGen;
- }
-
- /**
- * @param inputSize The inputSize to set.
- */
- public void setInputSize(int inputSize) {
- this.inputSize = inputSize;
- }
-
- /**
- * @param generator The generator to set.
- */
- public void setGenerator(KeyGenerator generator) {
- this.generator = generator;
- }
-
- /**
- * @return Returns the keyGenerators.
- */
- public Map<String, KeyGenerator> getKeyGenerators() {
- return keyGenerators;
- }
-
- /**
- * @return Returns the outputRowMeta.
- */
- public RowMetaInterface getOutputRowMeta() {
- return outputRowMeta;
- }
-
- /**
- * @param outputRowMeta The outputRowMeta to set.
- */
- public void setOutputRowMeta(RowMetaInterface outputRowMeta) {
- this.outputRowMeta = outputRowMeta;
- }
-
- public void clean() {
- outputRowMeta = null;
-
- surrogateKeyGen = null;
-
- generator = null;
- keyGenerators = null;
-
- columnIndex = null;
-
- defaultObjects = null;
-
- }
-}