You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/06 10:49:45 UTC
[3/7] incubator-carbondata git commit: Compaction lock should also be
acquired during alter operation as alter and compaction on same table should
not be allowed concurrently.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
deleted file mode 100644
index 29aa7e7..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.io.File;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.spark.merger.exeception.SliceMergerException;
-
-/**
- * This is the Merger class responsible for the merging of the segments.
- */
-public class RowResultMerger {
-
- private final String databaseName;
- private final String tableName;
- private final String tempStoreLocation;
- private final String factStoreLocation;
- private CarbonFactHandler dataHandler;
- private List<RawResultIterator> rawResultIteratorList =
- new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- private SegmentProperties segprop;
- /**
- * record holder heap
- */
- private AbstractQueue<RawResultIterator> recordHolderHeap;
-
- private TupleConversionAdapter tupleConvertor;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RowResultMerger.class.getName());
-
- public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
- String tableName, SegmentProperties segProp, String tempStoreLocation,
- CarbonLoadModel loadModel, CompactionType compactionType) {
-
- CarbonDataFileAttributes carbonDataFileAttributes;
-
- this.rawResultIteratorList = iteratorList;
- // create the List of RawResultIterator.
-
- recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
- new RowResultMerger.CarbonMdkeyComparator());
-
- this.segprop = segProp;
- this.tempStoreLocation = tempStoreLocation;
-
- this.factStoreLocation = loadModel.getStorePath();
-
- if (!new File(tempStoreLocation).mkdirs()) {
- LOGGER.error("Error while new File(tempStoreLocation).mkdirs() ");
- }
-
- this.databaseName = databaseName;
- this.tableName = tableName;
-
- int measureCount = segprop.getMeasures().size();
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
- CarbonFactDataHandlerModel carbonFactDataHandlerModel =
- getCarbonFactDataHandlerModel(loadModel);
- carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality());
-
- if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
- int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
- CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(),
- carbonTable.getCarbonTableIdentifier()));
-
- // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
- // be written in same segment. So the TaskNo should be incremented by 1 from max val.
- int index = taskNo + 1;
- carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp());
- }
- else {
- carbonDataFileAttributes =
- new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
- loadModel.getFactTimeStamp());
- }
-
- carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
- if (segProp.getNumberOfNoDictionaryDimension() > 0
- || segProp.getComplexDimensions().size() > 0) {
- carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
- } else {
- carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
- }
- carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
- dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-
- tupleConvertor = new TupleConversionAdapter(segProp);
- }
-
- /**
- * Merge function
- *
- */
- public boolean mergerSlice() {
- boolean mergeStatus = false;
- int index = 0;
- boolean isDataPresent = false;
- try {
-
- // add all iterators to the queue
- for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) {
- this.recordHolderHeap.add(leaftTupleIterator);
- index++;
- }
- RawResultIterator iterator = null;
- while (index > 1) {
- // iterator the top record
- iterator = this.recordHolderHeap.poll();
- Object[] convertedRow = iterator.next();
- if (null == convertedRow) {
- index--;
- continue;
- }
- if (!isDataPresent) {
- dataHandler.initialise();
- isDataPresent = true;
- }
- // get the mdkey
- addRow(convertedRow);
- // if there is no record in the leaf and all then decrement the
- // index
- if (!iterator.hasNext()) {
- index--;
- continue;
- }
- // add record to heap
- this.recordHolderHeap.add(iterator);
- }
- // if record holder is not empty then iterator the slice holder from
- // heap
- iterator = this.recordHolderHeap.poll();
- while (true) {
- Object[] convertedRow = iterator.next();
- if (null == convertedRow) {
- break;
- }
- // do it only once
- if (!isDataPresent) {
- dataHandler.initialise();
- isDataPresent = true;
- }
- addRow(convertedRow);
- // check if leaf contains no record
- if (!iterator.hasNext()) {
- break;
- }
- }
- if (isDataPresent)
- {
- this.dataHandler.finish();
- }
- mergeStatus = true;
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- LOGGER.error("Exception in compaction merger " + e.getMessage());
- mergeStatus = false;
- } finally {
- try {
- if (isDataPresent) {
- this.dataHandler.closeHandler();
- }
- } catch (CarbonDataWriterException e) {
- LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
- mergeStatus = false;
- }
- }
-
- return mergeStatus;
- }
-
- /**
- * Below method will be used to add sorted row
- *
- * @throws SliceMergerException
- */
- private void addRow(Object[] carbonTuple) throws SliceMergerException {
- Object[] rowInWritableFormat;
-
- rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
- try {
- this.dataHandler.addDataToStore(rowInWritableFormat);
- } catch (CarbonDataWriterException e) {
- throw new SliceMergerException("Problem in merging the slice", e);
- }
- }
-
- /**
- * This method will create a model object for carbon fact data handler
- *
- * @param loadModel
- * @return
- */
- private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel) {
- CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
- carbonFactDataHandlerModel.setDatabaseName(databaseName);
- carbonFactDataHandlerModel.setTableName(tableName);
- carbonFactDataHandlerModel.setMeasureCount(segprop.getMeasures().size());
- carbonFactDataHandlerModel.setCompactionFlow(true);
- carbonFactDataHandlerModel
- .setMdKeyLength(segprop.getDimensionKeyGenerator().getKeySizeInBytes());
- carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
- carbonFactDataHandlerModel.setDimLens(segprop.getDimColumnsCardinality());
- carbonFactDataHandlerModel.setSegmentProperties(segprop);
- carbonFactDataHandlerModel.setNoDictionaryCount(segprop.getNumberOfNoDictionaryDimension());
- carbonFactDataHandlerModel.setDimensionCount(
- segprop.getDimensions().size() - carbonFactDataHandlerModel.getNoDictionaryCount());
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
- List<ColumnSchema> wrapperColumnSchema = CarbonUtil
- .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
- carbonTable.getMeasureByTableName(tableName));
- carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
- // get the cardinality for all all the columns including no dictionary columns
- int[] formattedCardinality =
- CarbonUtil.getFormattedCardinality(segprop.getDimColumnsCardinality(), wrapperColumnSchema);
- carbonFactDataHandlerModel.setColCardinality(formattedCardinality);
- //TO-DO Need to handle complex types here .
- Map<Integer, GenericDataType> complexIndexMap =
- new HashMap<Integer, GenericDataType>(segprop.getComplexDimensions().size());
- carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
- carbonFactDataHandlerModel.setDataWritingRequest(true);
-
- char[] aggType = new char[segprop.getMeasures().size()];
- Arrays.fill(aggType, 'n');
- int i = 0;
- for (CarbonMeasure msr : segprop.getMeasures()) {
- aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
- }
- carbonFactDataHandlerModel.setAggType(aggType);
- carbonFactDataHandlerModel.setFactDimLens(segprop.getDimColumnsCardinality());
-
- String carbonDataDirectoryPath =
- checkAndCreateCarbonStoreLocation(this.factStoreLocation, databaseName, tableName,
- loadModel.getPartitionId(), loadModel.getSegmentId());
- carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
-
- List<CarbonDimension> dimensionByTableName =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName);
- boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
- int index = 0;
- for (CarbonDimension dimension : dimensionByTableName) {
- isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex();
- }
- carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
- return carbonFactDataHandlerModel;
- }
-
- /**
- * This method will get the store location for the given path, segment id and partition id
- *
- * @return data directory path
- */
- private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName,
- String tableName, String partitionId, String segmentId) {
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
- CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
- String carbonDataDirectoryPath =
- carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
- CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
- return carbonDataDirectoryPath;
- }
-
- /**
- * Comparator class for comparing 2 raw row result.
- */
- private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
-
- @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
-
- Object[] row1 = new Object[0];
- Object[] row2 = new Object[0];
- try {
- row1 = o1.fetchConverted();
- row2 = o2.fetchConverted();
- } catch (KeyGenException e) {
- LOGGER.error(e.getMessage());
- }
- if (null == row1 || null == row2) {
- return 0;
- }
- ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
- ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
- int compareResult = 0;
- int[] columnValueSizes = segprop.getEachDimColumnValueSize();
- int dictionaryKeyOffset = 0;
- byte[] dimCols1 = key1.getDictionaryKey();
- byte[] dimCols2 = key2.getDictionaryKey();
- int noDicIndex = 0;
- for (int eachColumnValueSize : columnValueSizes) {
- // case of dictionary cols
- if (eachColumnValueSize > 0) {
-
- compareResult = ByteUtil.UnsafeComparer.INSTANCE
- .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
- dictionaryKeyOffset, eachColumnValueSize);
- dictionaryKeyOffset += eachColumnValueSize;
- } else { // case of no dictionary
-
- byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
- byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
- compareResult =
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
- noDicIndex++;
-
- }
- if (0 != compareResult) {
- return compareResult;
- }
- }
- return 0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
deleted file mode 100644
index d1a3a8d..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.merger;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-public class TableMeta implements Serializable {
-
- private static final long serialVersionUID = -1749874611119829431L;
-
- public CarbonTableIdentifier carbonTableIdentifier;
- public String storePath;
- public CarbonTable carbonTable;
-
- public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath,
- CarbonTable carbonTable) {
- this.carbonTableIdentifier = carbonTableIdentifier;
- this.storePath = storePath;
- this.carbonTable = carbonTable;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
deleted file mode 100644
index 08b563f..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-
-/**
- * This class will be used to convert the Result into the format used in data writer.
- */
-class TupleConversionAdapter {
-
- private final SegmentProperties segmentproperties;
-
- private int noDictionaryPresentIndex;
-
- private int measureCount;
-
- private boolean isNoDictionaryPresent;
-
- public TupleConversionAdapter(SegmentProperties segmentProperties) {
- this.measureCount = segmentProperties.getMeasures().size();
- this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0;
- if (isNoDictionaryPresent) {
- noDictionaryPresentIndex++;
- }
- this.segmentproperties = segmentProperties;
- }
-
- /**
- * Converting the raw result to the format understandable by the data writer.
- * @param carbonTuple
- * @return
- */
- public Object[] getObjectArray(Object[] carbonTuple) {
- Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
- int index = 0;
- // put measures.
-
- for (int j = 1; j <= measureCount; j++) {
- row[index++] = carbonTuple[j];
- }
-
- // put No dictionary byte []
- if (isNoDictionaryPresent) {
- row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeys();
- }
-
- // put No Dictionary Dims
- row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
- return row;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java
deleted file mode 100644
index fd6610c..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.merger.exeception;
-
-import java.util.Locale;
-
-public class SliceMergerException extends Exception {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceMergerException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceMergerException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index f04669c..277005b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -31,9 +31,9 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
/**
* IUD carbon merger RDD
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 51f9022..350a2ec 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -46,11 +46,11 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
+import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger._
import org.apache.carbondata.spark.splits.TableSplit
class CarbonMergerRDD[K, V](
@@ -152,9 +152,14 @@ class CarbonMergerRDD[K, V](
CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
carbonLoadModel.setStorePath(hdfsStoreLocation)
-
+ // check for restructured block
+ // TODO: only in case of add and drop this variable should be true
+ val restructuredBlockExists: Boolean = CarbonCompactionUtil
+ .checkIfAnyRestructuredBlockExists(segmentMapping,
+ dataFileMetadataSegMapping,
+ carbonTable.getTableLastUpdatedTime)
exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
- carbonTable, dataFileMetadataSegMapping)
+ carbonTable, dataFileMetadataSegMapping, restructuredBlockExists)
// fire a query and get the results.
var result2: java.util.List[RawResultIterator] = null
@@ -190,17 +195,25 @@ class CarbonMergerRDD[K, V](
carbonLoadModel.setSegmentId(mergeNumber)
carbonLoadModel.setPartitionId("0")
- val merger =
- new RowResultMerger(result2,
- databaseName,
- factTableName,
+ var processor: AbstractResultProcessor = null
+ if (restructuredBlockExists) {
+ processor = new CompactionResultSortProcessor(carbonLoadModel, carbonTable,
segmentProperties,
- tempStoreLoc,
- carbonLoadModel,
- carbonMergerMapping.campactionType
+ carbonMergerMapping.campactionType,
+ factTableName
)
- mergeStatus = merger.mergerSlice()
-
+ } else {
+ processor =
+ new RowResultMergerProcessor(
+ databaseName,
+ factTableName,
+ segmentProperties,
+ tempStoreLoc,
+ carbonLoadModel,
+ carbonMergerMapping.campactionType
+ )
+ }
+ mergeStatus = processor.execute(result2)
mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 3b38028..1a237f6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -24,9 +24,8 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionType}
/**
* Compactor class which handled the compaction cases.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 0ba99a8..d6cc2e6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -33,10 +33,11 @@ import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifie
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark._
+import org.apache.carbondata.spark.compaction.CompactionCallable
import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType}
import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index dadd03e..367bf46 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -39,10 +39,10 @@ import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.load.FailureCauses
-import org.apache.carbondata.spark.merger.CompactionType
import org.apache.carbondata.spark.rdd.AlterTableAddColumnRDD
import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 60742ac..4cca0a3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -50,11 +50,11 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.csvload.BlockDetails
import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index e322fc8..0e6153f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -40,8 +40,8 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
+import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
-import org.apache.carbondata.spark.merger.TableMeta
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
private[sql] case class CarbonDatasourceHadoopRelation(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 4c5e733..2ff21c8e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -34,8 +34,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonOption, _}
-import org.apache.carbondata.spark.merger.TableMeta
/**
* Carbon relation provider compliant to data source api.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 62e5241..a439c30 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -43,10 +43,9 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
import org.apache.carbondata.processing.exception.MultipleMatchingException
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
import org.apache.carbondata.spark.DeleteDelataResultImpl
import org.apache.carbondata.spark.load.FailureCauses
-import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil._
import org.apache.carbondata.spark.util.QueryPlanUtil
@@ -272,7 +271,7 @@ object IUDCommon {
carbonRelation: CarbonRelation,
isUpdateOperation: Boolean): Unit = {
- var ishorizontalCompaction = isHorizontalCompactionEnabled()
+ var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled()
if (ishorizontalCompaction == false) {
return
@@ -288,7 +287,7 @@ object IUDCommon {
val deleteTimeStamp = updateTimeStamp + 1
// get the valid segments
- var segLists = getValidSegmentList(absTableIdentifier)
+ var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
if (segLists == null || segLists.size() == 0) {
return
@@ -350,7 +349,7 @@ object IUDCommon {
val db = carbonTable.getDatabaseName
val table = carbonTable.getFactTableName
// get the valid segments qualified for update compaction.
- val validSegList = getSegListIUDCompactionQualified(segLists,
+ val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
absTableIdentifier,
segmentUpdateStatusManager,
compactionTypeIUD)
@@ -406,7 +405,7 @@ object IUDCommon {
val db = carbonTable.getDatabaseName
val table = carbonTable.getFactTableName
- val deletedBlocksList = getSegListIUDCompactionQualified(segLists,
+ val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
absTableIdentifier,
segmentUpdateStatusManager,
compactionTypeIUD)
@@ -436,7 +435,7 @@ object IUDCommon {
val blockName = segmentAndBlocks
.substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
- val result = compactBlockDeleteDeltaFiles(segment, blockName,
+ val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
absTableIdentifier,
updateStatusDetails,
timestamp)
@@ -453,7 +452,7 @@ object IUDCommon {
})
})
- val updateStatus = updateStatusFile(resultList.toList.asJava,
+ val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava,
carbonTable,
timestamp.toString,
segmentUpdateStatusManager)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index d5cc540..e8d3907 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -47,7 +47,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFa
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.merger.TableMeta
+import org.apache.carbondata.processing.merger.TableMeta
case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7cb5ed4..4f33043 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -50,11 +50,11 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.csvload.BlockDetails
import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 4169ac3..b9e8682 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -32,8 +32,8 @@ import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.CarbonFilters
-import org.apache.carbondata.spark.merger.TableMeta
import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.spark.util.CarbonSparkUtil
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 88ca4af..38fdb11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -50,8 +50,9 @@ private[sql] case class AlterTableAddColumns(
val dbName = alterTableAddColumnsModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
- val carbonLock = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ val locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
@@ -102,13 +103,7 @@ private[sql] case class AlterTableAddColumns(
sys.error("Alter table add column operation failed. Please check the logs")
} finally {
// release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Alter table add columns lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during alter table add columns operation")
- }
- }
+ AlterTableUtil.releaseLocks(locks, LOGGER)
}
Seq.empty
}
@@ -147,15 +142,15 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
s"Table $oldDatabaseName.$oldTableName does not exist")
sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
}
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.DROP_TABLE_LOCK)
+ val locks = AlterTableUtil
+ .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
+ sparkSession)
val carbonTable = relation.tableMeta.carbonTable
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- if (carbonLock.lockWithRetries()) {
- LOGGER.info("Successfully able to get the table metadata file lock")
- } else {
- sys.error("Table is locked for updation. Please try after some time")
- }
try {
// get the latest carbon table and check for column existence
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
@@ -200,24 +195,15 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
sys.error("Alter table rename table operation failed. Please check the logs")
} finally {
// release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Lock released successfully")
- } else {
- val storeLocation = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION,
- System.getProperty("java.io.tmpdir"))
- val lockFilePath = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
- oldDatabaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName +
- CarbonCommonConstants.FILE_SEPARATOR +
- LockUsage.METADATA_LOCK
- if(carbonLock.releaseLockManually(lockFilePath)) {
- LOGGER.info("Lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during rename table")
- }
- }
- }
+ AlterTableUtil.releaseLocks(locks, LOGGER)
+ // case specific to rename table as after table rename old table path will not be found
+ AlterTableUtil
+ .releaseLocksManually(locks,
+ locksToBeAcquired,
+ oldDatabaseName,
+ newTableName,
+ carbonTable.getStorePath,
+ LOGGER)
}
Seq.empty
}
@@ -251,8 +237,9 @@ private[sql] case class AlterTableDropColumns(
val dbName = alterTableDropColumnModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
- val carbonLock = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ val locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
try {
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
@@ -333,13 +320,7 @@ private[sql] case class AlterTableDropColumns(
sys.error("Alter table drop column operation failed. Please check the logs")
} finally {
// release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Alter table drop columns lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during alter table drop columns operation")
- }
- }
+ AlterTableUtil.releaseLocks(locks, LOGGER)
}
Seq.empty
}
@@ -355,8 +336,9 @@ private[sql] case class AlterTableDataTypeChange(
val dbName = alterTableDataTypeChangeModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
- val carbonLock = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ val locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
try {
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
@@ -416,13 +398,7 @@ private[sql] case class AlterTableDataTypeChange(
sys.error("Alter table data type change operation failed. Please check the logs")
} finally {
// release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Alter table change data type lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during alter table change data type operation")
- }
- }
+ AlterTableUtil.releaseLocks(locks, LOGGER)
}
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index f7ea344..6460490 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -53,7 +53,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFa
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.merger.TableMeta
+import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.util.CarbonSparkUtil
case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 243eeb6..2e7eebf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import scala.collection.mutable.ListBuffer
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -27,12 +29,15 @@ import org.apache.carbondata.common.logging.LogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
object AlterTableUtil {
-
- def validateTableAndAcquireLock(dbName: String, tableName: String, LOGGER: LogService)
- (sparkSession: SparkSession): ICarbonLock = {
+ def validateTableAndAcquireLock(dbName: String,
+ tableName: String,
+ locksToBeAcquired: List[String],
+ LOGGER: LogService)
+ (sparkSession: SparkSession): List[ICarbonLock] = {
val relation =
CarbonEnv.get.carbonMetastore
.lookupRelation(Option(dbName), tableName)(sparkSession)
@@ -44,17 +49,82 @@ object AlterTableUtil {
}
// acquire the lock first
val table = relation.tableMeta.carbonTable
+ var acquiredLocks = ListBuffer[ICarbonLock]()
+ locksToBeAcquired.foreach { lock =>
+ acquiredLocks += getLockObject(table, lock, LOGGER)
+ }
+ acquiredLocks.toList
+ }
+
+ /**
+ * Given a lock type this method will return a new lock object if not acquired by any other
+ * operation
+ *
+ * @param carbonTable
+ * @param lockType
+ * @param LOGGER
+ * @return
+ */
+ private def getLockObject(carbonTable: CarbonTable,
+ lockType: String,
+ LOGGER: LogService): ICarbonLock = {
val carbonLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ lockType)
if (carbonLock.lockWithRetries()) {
- LOGGER.info("Successfully able to get the table metadata file lock")
+ LOGGER.info(s"Successfully acquired the lock $lockType")
} else {
sys.error("Table is locked for updation. Please try after some time")
}
carbonLock
}
+ /**
+ * This method will release the locks acquired for an operation
+ *
+ * @param locks
+ * @param LOGGER
+ */
+ def releaseLocks(locks: List[ICarbonLock], LOGGER: LogService): Unit = {
+ locks.foreach { carbonLock =>
+ if (carbonLock.unlock()) {
+ LOGGER.info("Alter table lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during alter table operation")
+ }
+ }
+ }
+
+ /**
+ * This method will release the locks by manually forming a lock path. Specific usage for
+ * rename table
+ *
+ * @param locks
+ * @param locksAcquired
+ * @param dbName
+ * @param tableName
+ * @param storeLocation
+ * @param LOGGER
+ */
+ def releaseLocksManually(locks: List[ICarbonLock],
+ locksAcquired: List[String],
+ dbName: String,
+ tableName: String,
+ storeLocation: String,
+ LOGGER: LogService): Unit = {
+ val lockLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
+ dbName + CarbonCommonConstants.FILE_SEPARATOR + tableName
+ locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
+ val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR +
+ lockType
+ if (carbonLock.releaseLockManually(lockFilePath)) {
+ LOGGER.info(s"Alter table lock released successfully: ${ lockType }")
+ } else {
+ LOGGER.error("Unable to release lock during alter table operation")
+ }
+ }
+ }
+
def updateSchemaInfo(carbonTable: CarbonTable,
schemaEvolutionEntry: SchemaEvolutionEntry,
thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 6ca8449..0d85062 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableModel}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.merger.CompactionType
+import org.apache.carbondata.processing.merger.CompactionType
/**
* table compaction api
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 914136c..91dd8b3 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -382,7 +382,7 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll {
}
test("test to check if the lock file is successfully deleted") {
- sql("create table lock_check(id int, name string) stored by 'carbondata'")
+ sql("create table lock_check(id int, name string) stored by 'carbondata'")
sql("alter table lock_check rename to lock_rename")
assert(!new File(s"${ CarbonCommonConstants.STORE_LOCATION } + /default/lock_rename/meta.lock")
.exists())
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
new file mode 100644
index 0000000..f76c66f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.merger;
+
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+
+/**
+ * This class contains the common methods required for result processing during compaction based on
+ * restructure and normal scenarios
+ */
+public abstract class AbstractResultProcessor {
+
+ /**
+ * This method will perform the desired tasks of merging the selected slices
+ *
+ * @param resultIteratorList
+ * @return
+ */
+ public abstract boolean execute(List<RawResultIterator> resultIteratorList);
+
+ protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
+ CompactionType compactionType, CarbonTable carbonTable,
+ CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+ CarbonDataFileAttributes carbonDataFileAttributes;
+ if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+ int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
+ CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(),
+ carbonTable.getCarbonTableIdentifier()));
+ // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
+ // be written in same segment. So the TaskNo should be incremented by 1 from max val.
+ int index = taskNo + 1;
+ carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp());
+ } else {
+ carbonDataFileAttributes =
+ new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+ loadModel.getFactTimeStamp());
+ }
+ carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
new file mode 100644
index 0000000..c00fe2e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Executor class for executing the query on the selected segments to be merged.
+ * This will fire a select * query and get the raw result.
+ */
+public class CarbonCompactionExecutor {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+ private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
+ private final SegmentProperties destinationSegProperties;
+ private final Map<String, TaskBlockInfo> segmentMapping;
+ private QueryExecutor queryExecutor;
+ private CarbonTable carbonTable;
+ private QueryModel queryModel;
+
+ /**
+ * flag to check whether any restructured block exists in the blocks selected for compaction.
+ * Based on this decision will be taken whether complete data has to be sorted again
+ */
+ private boolean restructuredBlockExists;
+
+ /**
+ * Constructor
+ *
+ * @param segmentMapping
+ * @param segmentProperties
+ * @param carbonTable
+ * @param dataFileMetadataSegMapping
+ * @param restructuredBlockExists
+ */
+ public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
+ SegmentProperties segmentProperties, CarbonTable carbonTable,
+ Map<String, List<DataFileFooter>> dataFileMetadataSegMapping,
+ boolean restructuredBlockExists) {
+ this.segmentMapping = segmentMapping;
+ this.destinationSegProperties = segmentProperties;
+ this.carbonTable = carbonTable;
+ this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
+ this.restructuredBlockExists = restructuredBlockExists;
+ }
+
+ /**
+ * For processing of the table blocks.
+ *
+ * @return List of Carbon iterators
+ */
+ public List<RawResultIterator> processTableBlocks() throws QueryExecutionException, IOException {
+ List<RawResultIterator> resultList =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<TableBlockInfo> list = null;
+ queryModel = prepareQueryModel(list);
+ // iterate each seg ID
+ for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+ String segmentId = taskMap.getKey();
+ List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
+ SegmentProperties sourceSegProperties = getSourceSegmentProperties(listMetadata);
+ // for each segment get taskblock info
+ TaskBlockInfo taskBlockInfo = taskMap.getValue();
+ Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+ for (String task : taskBlockListMapping) {
+ list = taskBlockInfo.getTableBlockInfoList(task);
+ Collections.sort(list);
+ LOGGER.info("for task -" + task + "-block size is -" + list.size());
+ queryModel.setTableBlockInfos(list);
+ resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
+ destinationSegProperties));
+ }
+ }
+ return resultList;
+ }
+
+ /**
+ * This method will create the source segment properties based on restructured block existence
+ *
+ * @param listMetadata
+ * @return
+ */
+ private SegmentProperties getSourceSegmentProperties(List<DataFileFooter> listMetadata) {
+ SegmentProperties sourceSegProperties = null;
+ if (restructuredBlockExists) {
+ // update cardinality of source segment according to new schema
+ Map<String, Integer> columnToCardinalityMap =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ CarbonCompactionUtil
+ .addColumnCardinalityToMap(columnToCardinalityMap, listMetadata.get(0).getColumnInTable(),
+ listMetadata.get(0).getSegmentInfo().getColumnCardinality());
+ List<ColumnSchema> updatedColumnSchemaList =
+ new ArrayList<>(listMetadata.get(0).getColumnInTable().size());
+ int[] updatedColumnCardinalities = CarbonCompactionUtil
+ .updateColumnSchemaAndGetCardinality(columnToCardinalityMap, carbonTable,
+ updatedColumnSchemaList);
+ sourceSegProperties =
+ new SegmentProperties(updatedColumnSchemaList, updatedColumnCardinalities);
+ } else {
+ sourceSegProperties = new SegmentProperties(listMetadata.get(0).getColumnInTable(),
+ listMetadata.get(0).getSegmentInfo().getColumnCardinality());
+ }
+ return sourceSegProperties;
+ }
+
+ /**
+ * get executor and execute the query model.
+ *
+ * @param blockList
+ * @return
+ */
+ private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+ throws QueryExecutionException, IOException {
+ queryModel.setTableBlockInfos(blockList);
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ return queryExecutor.execute(queryModel);
+ }
+
+ /**
+ * Below method will be used
+ * for cleanup
+ */
+ public void finish() {
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e, "Problem while finish: ");
+ }
+ clearDictionaryFromQueryModel();
+ }
+
+ /**
+ * This method will clear the dictionary access count after its usage is complete so
+ * that column can be deleted form LRU cache whenever memory reaches threshold
+ */
+ private void clearDictionaryFromQueryModel() {
+ if (null != queryModel) {
+ Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+ if (null != columnToDictionaryMapping) {
+ for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+ CarbonUtil.clearDictionaryCache(entry.getValue());
+ }
+ }
+ }
+ }
+
+ /**
+ * Preparing of the query model.
+ *
+ * @param blockList
+ * @return
+ */
+ private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+ QueryModel model = new QueryModel();
+ model.setTableBlockInfos(blockList);
+ model.setForcedDetailRawQuery(true);
+ model.setFilterExpressionResolverTree(null);
+
+ List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ for (CarbonDimension dim : dimensions) {
+ // check if dimension is deleted
+ QueryDimension queryDimension = new QueryDimension(dim.getColName());
+ queryDimension.setDimension(dim);
+ dims.add(queryDimension);
+ }
+ model.setQueryDimension(dims);
+
+ List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ for (CarbonMeasure carbonMeasure : measures) {
+ // check if measure is deleted
+ QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+ queryMeasure.setMeasure(carbonMeasure);
+ msrs.add(queryMeasure);
+ }
+ model.setQueryMeasures(msrs);
+ model.setQueryId(System.nanoTime() + "");
+ model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
+ model.setTable(carbonTable);
+ return model;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
new file mode 100644
index 0000000..2ad83a4
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * Utility Class for the Compaction Flow.
+ */
+public class CarbonCompactionUtil {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+
+ /**
+ * To create a mapping of Segment Id and TableBlockInfo.
+ *
+ * @param tableBlockInfoList
+ * @return
+ */
+ public static Map<String, TaskBlockInfo> createMappingForSegments(
+ List<TableBlockInfo> tableBlockInfoList) {
+
+ // stores taskBlockInfo of each segment
+ Map<String, TaskBlockInfo> segmentBlockInfoMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+
+ for (TableBlockInfo info : tableBlockInfoList) {
+ String segId = info.getSegmentId();
+ // check if segId is already present in map
+ TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId);
+ // extract task ID from file Path.
+ String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath());
+ // if taskBlockInfo is not there, then create and add
+ if (null == taskBlockInfoMapping) {
+ taskBlockInfoMapping = new TaskBlockInfo();
+ groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+ // put the taskBlockInfo with respective segment id
+ segmentBlockInfoMapping.put(segId, taskBlockInfoMapping);
+ } else
+ {
+ groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+ }
+ }
+ return segmentBlockInfoMapping;
+
+ }
+
+ /**
+ * Grouping the taskNumber and list of TableBlockInfo.
+ * @param info
+ * @param taskBlockMapping
+ * @param taskNo
+ */
+ private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info,
+ TaskBlockInfo taskBlockMapping, String taskNo) {
+ // get the corresponding list from task mapping.
+ List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo);
+ if (null != blockLists) {
+ blockLists.add(info);
+ } else {
+ blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ blockLists.add(info);
+ taskBlockMapping.addTableBlockInfoList(taskNo, blockLists);
+ }
+ }
+
+ /**
+ * To create a mapping of Segment Id and DataFileFooter.
+ *
+ * @param tableBlockInfoList
+ * @return
+ */
+ public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
+ List<TableBlockInfo> tableBlockInfoList) throws IOException {
+
+ Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
+ for (TableBlockInfo blockInfo : tableBlockInfoList) {
+ List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
+ String segId = blockInfo.getSegmentId();
+ DataFileFooter dataFileMatadata = null;
+ // check if segId is already present in map
+ List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
+ dataFileMatadata = CarbonUtil.readMetadatFile(blockInfo);
+ if (null == metadataList) {
+ // if it is not present
+ eachSegmentBlocks.add(dataFileMatadata);
+ segmentBlockInfoMapping.put(segId, eachSegmentBlocks);
+ } else {
+ // if its already present then update the list.
+ metadataList.add(dataFileMatadata);
+ }
+ }
+ return segmentBlockInfoMapping;
+
+ }
+
+ /**
+ * Check whether the file to indicate the compaction is present or not.
+ * @param metaFolderPath
+ * @return
+ */
+ public static boolean isCompactionRequiredForTable(String metaFolderPath) {
+ String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+
+ String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ try {
+ if (FileFactory.isFileExist(minorCompactionStatusFile,
+ FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
+ .isFileExist(majorCompactionStatusFile,
+ FileFactory.getFileType(majorCompactionStatusFile))) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage());
+ }
+ return false;
+ }
+
+ /**
+ * Determine the type of the compaction received.
+ * @param metaFolderPath
+ * @return
+ */
+ public static CompactionType determineCompactionType(String metaFolderPath) {
+ String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+
+ String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ try {
+ if (FileFactory.isFileExist(minorCompactionStatusFile,
+ FileFactory.getFileType(minorCompactionStatusFile))) {
+ return CompactionType.MINOR_COMPACTION;
+ }
+ if (FileFactory.isFileExist(majorCompactionStatusFile,
+ FileFactory.getFileType(majorCompactionStatusFile))) {
+ return CompactionType.MAJOR_COMPACTION;
+ }
+
+ } catch (IOException e) {
+ LOGGER.error("Exception in determining the compaction request file " + e.getMessage());
+ }
+ return CompactionType.MINOR_COMPACTION;
+ }
+
+ /**
+ * Delete the compation request file once the compaction is done.
+ * @param metaFolderPath
+ * @param compactionType
+ * @return
+ */
+ public static boolean deleteCompactionRequiredFile(String metaFolderPath,
+ CompactionType compactionType) {
+ String compactionRequiredFile;
+ if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+ compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+ } else {
+ compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ }
+ try {
+ if (FileFactory
+ .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) {
+ if (FileFactory
+ .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))
+ .delete()) {
+ LOGGER.info("Deleted the compaction request file " + compactionRequiredFile);
+ return true;
+ } else {
+ LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile);
+ }
+ } else {
+ LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception in deleting the compaction request file " + e.getMessage());
+ }
+ return false;
+ }
+
+ /**
+ * Creation of the compaction request if someother compaction is in progress.
+ * @param metaFolderPath
+ * @param compactionType
+ * @return
+ */
+ public static boolean createCompactionRequiredFile(String metaFolderPath,
+ CompactionType compactionType) {
+ String statusFile;
+ if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+ statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+ } else {
+ statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ }
+ try {
+ if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
+ if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
+ LOGGER.info("successfully created a compaction required file - " + statusFile);
+ return true;
+ } else {
+ LOGGER.error("Not able to create a compaction required file - " + statusFile);
+ return false;
+ }
+ } else {
+ LOGGER.info("Compaction request file : " + statusFile + " already exist.");
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception in creating the compaction request file " + e.getMessage());
+ }
+ return false;
+ }
+
+ /**
+ * This will check if any compaction request has been received for any table.
+ *
+ * @param tableMetas
+ * @return
+ */
+ public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+ List<CarbonTableIdentifier> skipList) {
+ for (TableMeta table : tableMetas) {
+ CarbonTable ctable = table.carbonTable;
+ String metadataPath = ctable.getMetaDataFilepath();
+ // check for the compaction required file and at the same time exclude the tables which are
+ // present in the skip list.
+ if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
+ .contains(table.carbonTableIdentifier)) {
+ return table;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * This method will add the prepare the max column cardinality map
+ *
+ * @param columnCardinalityMap
+ * @param currentBlockSchema
+ * @param currentBlockCardinality
+ */
+ public static void addColumnCardinalityToMap(Map<String, Integer> columnCardinalityMap,
+ List<ColumnSchema> currentBlockSchema, int[] currentBlockCardinality) {
+ for (int i = 0; i < currentBlockCardinality.length; i++) {
+ // add value to map only if does not exist or new cardinality is > existing value
+ String columnUniqueId = currentBlockSchema.get(i).getColumnUniqueId();
+ Integer value = columnCardinalityMap.get(columnUniqueId);
+ if (null == value) {
+ columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+ } else {
+ if (currentBlockCardinality[i] > value) {
+ columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * This method will return the updated cardinality according to the master schema
+ *
+ * @param columnCardinalityMap
+ * @param carbonTable
+ * @param updatedColumnSchemaList
+ * @return
+ */
+ public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
+ CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
+ List<CarbonDimension> masterDimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
+ for (CarbonDimension dimension : masterDimensions) {
+ Integer value = columnCardinalityMap.get(dimension.getColumnId());
+ if (null == value) {
+ updatedCardinalityList.add(getDimensionDefaultCardinality(dimension));
+ } else {
+ updatedCardinalityList.add(value);
+ }
+ updatedColumnSchemaList.add(dimension.getColumnSchema());
+ }
+ // add measures to the column schema list
+ List<CarbonMeasure> masterSchemaMeasures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ for (CarbonMeasure measure : masterSchemaMeasures) {
+ updatedColumnSchemaList.add(measure.getColumnSchema());
+ }
+ int[] updatedCardinality = ArrayUtils
+ .toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()]));
+ return updatedCardinality;
+ }
+
+ /**
+ * This method will return the default cardinality based on dimension type
+ *
+ * @param dimension
+ * @return
+ */
+ private static int getDimensionDefaultCardinality(CarbonDimension dimension) {
+ int cardinality = 0;
+ if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ cardinality = Integer.MAX_VALUE;
+ } else if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+ if (null != dimension.getDefaultValue()) {
+ cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1;
+ } else {
+ cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY;
+ }
+ } else {
+ cardinality = -1;
+ }
+ return cardinality;
+ }
+
+ /**
+ * This method will check for any restructured block in the blocks selected for compaction
+ *
+ * @param segmentMapping
+ * @param dataFileMetadataSegMapping
+ * @param tableLastUpdatedTime
+ * @return
+ */
+ public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
+ Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) {
+ boolean restructuredBlockExists = false;
+ for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+ String segmentId = taskMap.getKey();
+ List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
+ for (DataFileFooter dataFileFooter : listMetadata) {
+ // if schema modified timestamp is greater than footer stored schema timestamp,
+ // it indicates it is a restructured block
+ if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) {
+ restructuredBlockExists = true;
+ break;
+ }
+ }
+ if (restructuredBlockExists) {
+ break;
+ }
+ }
+ return restructuredBlockExists;
+ }
+}