You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/06 10:49:47 UTC

[5/7] incubator-carbondata git commit: Compaction lock should also be acquired during alter operation as alter and compaction on same table should not be allowed concurrently.

Compaction lock should also be acquired during alter operation as alter and compaction on same table should not be allowed concurrently.

Handling for compaction for restructure case. Handled to completely sort the data again if any restructured block is selected for compaction

Handled review comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/cc59b247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/cc59b247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/cc59b247

Branch: refs/heads/master
Commit: cc59b247aff7a6fd5d164d83dfe01890cfcdb2e1
Parents: 0f0907a
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Mar 15 19:24:05 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 6 16:17:24 2017 +0530

----------------------------------------------------------------------
 .../core/locks/CarbonLockFactory.java           |    8 +
 .../DictionaryBasedVectorResultCollector.java   |   12 +-
 .../RestructureBasedRawResultCollector.java     |    2 -
 .../spark/compaction/CompactionCallable.java    |   44 +
 .../carbondata/spark/load/CarbonLoaderUtil.java |   19 +-
 .../spark/merger/CarbonCompactionExecutor.java  |  211 ---
 .../spark/merger/CarbonCompactionUtil.java      |  354 -----
 .../spark/merger/CarbonDataMergerUtil.java      | 1371 -----------------
 .../merger/CarbonDataMergerUtilResult.java      |   33 -
 .../spark/merger/CompactionCallable.java        |   44 -
 .../merger/CompactionResultSortProcessor.java   |  563 -------
 .../carbondata/spark/merger/CompactionType.java |   31 -
 .../spark/merger/NodeBlockRelation.java         |   58 -
 .../spark/merger/NodeMultiBlockRelation.java    |   57 -
 .../spark/merger/RowResultMerger.java           |  367 -----
 .../carbondata/spark/merger/TableMeta.java      |   40 -
 .../spark/merger/TupleConversionAdapter.java    |   67 -
 .../merger/exeception/SliceMergerException.java |   78 -
 .../spark/rdd/CarbonIUDMergerRDD.scala          |    2 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   37 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |    3 +-
 .../spark/rdd/DataManagementFunc.scala          |    3 +-
 .../execution/command/carbonTableSchema.scala   |    2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |    2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |    2 +-
 .../spark/sql/CarbonDatasourceRelation.scala    |    2 +-
 .../sql/execution/command/IUDCommands.scala     |   15 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |    2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |    2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |    2 +-
 .../execution/command/AlterTableCommands.scala  |   82 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |    2 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |   82 +-
 .../org/apache/spark/util/Compaction.scala      |    2 +-
 .../AlterTableValidationTestCase.scala          |    2 +-
 .../merger/AbstractResultProcessor.java         |   64 +
 .../merger/CarbonCompactionExecutor.java        |  231 +++
 .../processing/merger/CarbonCompactionUtil.java |  383 +++++
 .../processing/merger/CarbonDataMergerUtil.java | 1385 ++++++++++++++++++
 .../merger/CarbonDataMergerUtilResult.java      |   33 +
 .../merger/CompactionResultSortProcessor.java   |  401 +++++
 .../processing/merger/CompactionType.java       |   31 +
 .../processing/merger/NodeBlockRelation.java    |   58 +
 .../merger/NodeMultiBlockRelation.java          |   57 +
 .../merger/RowResultMergerProcessor.java        |  233 +++
 .../carbondata/processing/merger/TableMeta.java |   40 +
 .../merger/TupleConversionAdapter.java          |   67 +
 .../merger/exeception/SliceMergerException.java |   78 +
 .../newflow/DataLoadProcessBuilder.java         |    4 +-
 .../steps/DataWriterProcessorStepImpl.java      |   37 +-
 .../sortandgroupby/sortdata/SortParameters.java |    3 +-
 .../store/CarbonFactDataHandlerModel.java       |   72 +
 .../util/CarbonDataProcessorUtil.java           |   75 +
 53 files changed, 3427 insertions(+), 3428 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
index 8cc494e..f9520ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.locks;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -28,6 +30,11 @@ import org.apache.carbondata.core.util.CarbonProperties;
 public class CarbonLockFactory {
 
   /**
+   * Attribute for LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonLockFactory.class.getName());
+  /**
    * lockTypeConfigured to check if zookeeper feature is enabled or not for carbon.
    */
   private static String lockTypeConfigured;
@@ -88,6 +95,7 @@ public class CarbonLockFactory {
   private static void getLockTypeConfigured() {
     lockTypeConfigured = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT);
+    LOGGER.info("Configured lock type is: " + lockTypeConfigured);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 82eaac7..082874d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -51,10 +51,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
 
   public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
-    queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
-    queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
-    allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length];
-    prepareDimensionAndMeasureColumnVectors();
+    // initialize only if the current block is not a restructured block else the initialization
+    // will be taken care by RestructureBasedVectorResultCollector
+    if (!blockExecutionInfos.isRestructuredBlock()) {
+      queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+      queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+      allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length];
+      prepareDimensionAndMeasureColumnVectors();
+    }
   }
 
   protected void prepareDimensionAndMeasureColumnVectors() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 782ffb2..fef5cc9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryDimension;
 import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
-import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -153,7 +152,6 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
-    ByteArrayWrapper wrapper = null;
     BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
         scannedResult.getDeleteDeltaDataCache();
     // scan the record and add to list

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
new file mode 100644
index 0000000..2773eef
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.compaction;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.spark.rdd.Compactor;
+
+import org.apache.spark.sql.execution.command.CompactionCallableModel;
+
+/**
+ * Callable class which is used to trigger the compaction in a separate callable.
+ */
+public class CompactionCallable implements Callable<Void> {
+
+  private final CompactionCallableModel compactionCallableModel;
+
+  public CompactionCallable(CompactionCallableModel compactionCallableModel) {
+
+    this.compactionCallableModel = compactionCallableModel;
+  }
+
+  @Override public Void call() throws Exception {
+
+    Compactor.triggerCompaction(compactionCallableModel);
+    return null;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 95f0b10..036e574 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -70,9 +70,9 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.merger.NodeBlockRelation;
+import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.spark.merger.NodeBlockRelation;
-import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
 
 import com.google.gson.Gson;
 import org.apache.spark.SparkConf;
@@ -203,21 +203,6 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * This method will get the store location for the given path, segemnt id and partition id
-   *
-   * @param storePath
-   * @param carbonTableIdentifier
-   * @param segmentId
-   * @return
-   */
-  public static String getStoreLocation(String storePath,
-      CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
-    return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
-  }
-
-  /**
    * This API will write the load level metadata for the loadmanagement module inorder to
    * manage the load and query execution management smoothly.
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
deleted file mode 100644
index 04fdc84..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
-import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Executor class for executing the query on the selected segments to be merged.
- * This will fire a select * query and get the raw result.
- */
-public class CarbonCompactionExecutor {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
-  private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
-  private final SegmentProperties destinationSegProperties;
-  private final Map<String, TaskBlockInfo> segmentMapping;
-  private QueryExecutor queryExecutor;
-  private CarbonTable carbonTable;
-  private QueryModel queryModel;
-
-  /**
-   * Constructor
-   *  @param segmentMapping
-   * @param segmentProperties
-   * @param carbonTable
-   */
-  public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
-      SegmentProperties segmentProperties, CarbonTable carbonTable,
-      Map<String, List<DataFileFooter>> dataFileMetadataSegMapping) {
-    this.segmentMapping = segmentMapping;
-    this.destinationSegProperties = segmentProperties;
-    this.carbonTable = carbonTable;
-    this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
-  }
-
-  /**
-   * For processing of the table blocks.
-   *
-   * @return List of Carbon iterators
-   */
-  public List<RawResultIterator> processTableBlocks() throws QueryExecutionException, IOException {
-
-    List<RawResultIterator> resultList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<TableBlockInfo> list = null;
-    queryModel = prepareQueryModel(list);
-    // iterate each seg ID
-    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
-      String segmentId = taskMap.getKey();
-      List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
-
-      // update cardinality of source segment according to new schema
-      Map<String, Integer> columnToCardinalityMap =
-          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-      CarbonCompactionUtil
-          .addColumnCardinalityToMap(columnToCardinalityMap, listMetadata.get(0).getColumnInTable(),
-              listMetadata.get(0).getSegmentInfo().getColumnCardinality());
-      List<ColumnSchema> updatedColumnSchemaList =
-          new ArrayList<>(listMetadata.get(0).getColumnInTable().size());
-      int[] updatedColumnCardinalities = CarbonCompactionUtil
-          .updateColumnSchemaAndGetCardinality(columnToCardinalityMap, carbonTable,
-              updatedColumnSchemaList);
-      SegmentProperties sourceSegProperties =
-          new SegmentProperties(updatedColumnSchemaList, updatedColumnCardinalities);
-
-      // for each segment get taskblock info
-      TaskBlockInfo taskBlockInfo = taskMap.getValue();
-      Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
-
-      for (String task : taskBlockListMapping) {
-
-        list = taskBlockInfo.getTableBlockInfoList(task);
-        Collections.sort(list);
-        LOGGER.info("for task -" + task + "-block size is -" + list.size());
-        queryModel.setTableBlockInfos(list);
-        resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
-            destinationSegProperties));
-
-      }
-    }
-
-    return resultList;
-  }
-
-  /**
-   * get executor and execute the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
-      throws QueryExecutionException, IOException {
-    queryModel.setTableBlockInfos(blockList);
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
-    return queryExecutor.execute(queryModel);
-  }
-
-  /**
-   * Below method will be used
-   * for cleanup
-   */
-  public void finish() {
-    try {
-      queryExecutor.finish();
-    } catch (QueryExecutionException e) {
-      LOGGER.error(e, "Problem while finish: ");
-    }
-    clearDictionaryFromQueryModel();
-  }
-
-  /**
-   * This method will clear the dictionary access count after its usage is complete so
-   * that column can be deleted form LRU cache whenever memory reaches threshold
-   */
-  private void clearDictionaryFromQueryModel() {
-    if (null != queryModel) {
-      Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
-      if (null != columnToDictionaryMapping) {
-        for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
-          CarbonUtil.clearDictionaryCache(entry.getValue());
-        }
-      }
-    }
-  }
-
-  /**
-   * Preparing of the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-    QueryModel model = new QueryModel();
-    model.setTableBlockInfos(blockList);
-    model.setForcedDetailRawQuery(true);
-    model.setFilterExpressionResolverTree(null);
-
-    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
-    for (CarbonDimension dim : dimensions) {
-      // check if dimension is deleted
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      queryDimension.setDimension(dim);
-      dims.add(queryDimension);
-    }
-    model.setQueryDimension(dims);
-
-    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
-    for (CarbonMeasure carbonMeasure : measures) {
-      // check if measure is deleted
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      queryMeasure.setMeasure(carbonMeasure);
-      msrs.add(queryMeasure);
-    }
-    model.setQueryMeasures(msrs);
-    model.setQueryId(System.nanoTime() + "");
-    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-    model.setTable(carbonTable);
-    return model;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
deleted file mode 100644
index ba9c4a8..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.lang3.ArrayUtils;
-
-/**
- * Utility Class for the Compaction Flow.
- */
-public class CarbonCompactionUtil {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
-
-  /**
-   * To create a mapping of Segment Id and TableBlockInfo.
-   *
-   * @param tableBlockInfoList
-   * @return
-   */
-  public static Map<String, TaskBlockInfo> createMappingForSegments(
-      List<TableBlockInfo> tableBlockInfoList) {
-
-    // stores taskBlockInfo of each segment
-    Map<String, TaskBlockInfo> segmentBlockInfoMapping =
-        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-
-    for (TableBlockInfo info : tableBlockInfoList) {
-      String segId = info.getSegmentId();
-      // check if segId is already present in map
-      TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId);
-      // extract task ID from file Path.
-      String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath());
-      // if taskBlockInfo is not there, then create and add
-      if (null == taskBlockInfoMapping) {
-        taskBlockInfoMapping = new TaskBlockInfo();
-        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
-        // put the taskBlockInfo with respective segment id
-        segmentBlockInfoMapping.put(segId, taskBlockInfoMapping);
-      } else
-      {
-        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
-      }
-    }
-    return segmentBlockInfoMapping;
-
-  }
-
-  /**
-   * Grouping the taskNumber and list of TableBlockInfo.
-   * @param info
-   * @param taskBlockMapping
-   * @param taskNo
-   */
-  private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info,
-      TaskBlockInfo taskBlockMapping, String taskNo) {
-    // get the corresponding list from task mapping.
-    List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo);
-    if (null != blockLists) {
-      blockLists.add(info);
-    } else {
-      blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-      blockLists.add(info);
-      taskBlockMapping.addTableBlockInfoList(taskNo, blockLists);
-    }
-  }
-
-  /**
-   * To create a mapping of Segment Id and DataFileFooter.
-   *
-   * @param tableBlockInfoList
-   * @return
-   */
-  public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
-      List<TableBlockInfo> tableBlockInfoList) throws IOException {
-
-    Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
-    for (TableBlockInfo blockInfo : tableBlockInfoList) {
-      List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
-      String segId = blockInfo.getSegmentId();
-      DataFileFooter dataFileMatadata = null;
-      // check if segId is already present in map
-      List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
-      dataFileMatadata = CarbonUtil.readMetadatFile(blockInfo);
-      if (null == metadataList) {
-        // if it is not present
-        eachSegmentBlocks.add(dataFileMatadata);
-        segmentBlockInfoMapping.put(segId, eachSegmentBlocks);
-      } else {
-        // if its already present then update the list.
-        metadataList.add(dataFileMatadata);
-      }
-    }
-    return segmentBlockInfoMapping;
-
-  }
-
-  /**
-   * Check whether the file to indicate the compaction is present or not.
-   * @param metaFolderPath
-   * @return
-   */
-  public static boolean isCompactionRequiredForTable(String metaFolderPath) {
-    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.minorCompactionRequiredFile;
-
-    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.majorCompactionRequiredFile;
-    try {
-      if (FileFactory.isFileExist(minorCompactionStatusFile,
-          FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
-          .isFileExist(majorCompactionStatusFile,
-              FileFactory.getFileType(majorCompactionStatusFile))) {
-        return true;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage());
-    }
-    return false;
-  }
-
-  /**
-   * Determine the type of the compaction received.
-   * @param metaFolderPath
-   * @return
-   */
-  public static CompactionType determineCompactionType(String metaFolderPath) {
-    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.minorCompactionRequiredFile;
-
-    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.majorCompactionRequiredFile;
-    try {
-      if (FileFactory.isFileExist(minorCompactionStatusFile,
-          FileFactory.getFileType(minorCompactionStatusFile))) {
-        return CompactionType.MINOR_COMPACTION;
-      }
-      if (FileFactory.isFileExist(majorCompactionStatusFile,
-          FileFactory.getFileType(majorCompactionStatusFile))) {
-        return CompactionType.MAJOR_COMPACTION;
-      }
-
-    } catch (IOException e) {
-      LOGGER.error("Exception in determining the compaction request file " + e.getMessage());
-    }
-    return CompactionType.MINOR_COMPACTION;
-  }
-
-  /**
-   * Delete the compation request file once the compaction is done.
-   * @param metaFolderPath
-   * @param compactionType
-   * @return
-   */
-  public static boolean deleteCompactionRequiredFile(String metaFolderPath,
-      CompactionType compactionType) {
-    String compactionRequiredFile;
-    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
-      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.minorCompactionRequiredFile;
-    } else {
-      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.majorCompactionRequiredFile;
-    }
-    try {
-      if (FileFactory
-          .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) {
-        if (FileFactory
-            .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))
-            .delete()) {
-          LOGGER.info("Deleted the compaction request file " + compactionRequiredFile);
-          return true;
-        } else {
-          LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile);
-        }
-      } else {
-        LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile);
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception in deleting the compaction request file " + e.getMessage());
-    }
-    return false;
-  }
-
-  /**
-   * Creation of the compaction request if someother compaction is in progress.
-   * @param metaFolderPath
-   * @param compactionType
-   * @return
-   */
-  public static boolean createCompactionRequiredFile(String metaFolderPath,
-      CompactionType compactionType) {
-    String statusFile;
-    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
-      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.minorCompactionRequiredFile;
-    } else {
-      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonCommonConstants.majorCompactionRequiredFile;
-    }
-    try {
-      if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
-        if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
-          LOGGER.info("successfully created a compaction required file - " + statusFile);
-          return true;
-        } else {
-          LOGGER.error("Not able to create a compaction required file - " + statusFile);
-          return false;
-        }
-      } else {
-        LOGGER.info("Compaction request file : " + statusFile + " already exist.");
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception in creating the compaction request file " + e.getMessage());
-    }
-    return false;
-  }
-
-  /**
-   * This will check if any compaction request has been received for any table.
-   *
-   * @param tableMetas
-   * @return
-   */
-  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
-      List<CarbonTableIdentifier> skipList) {
-    for (TableMeta table : tableMetas) {
-      CarbonTable ctable = table.carbonTable;
-      String metadataPath = ctable.getMetaDataFilepath();
-      // check for the compaction required file and at the same time exclude the tables which are
-      // present in the skip list.
-      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
-          .contains(table.carbonTableIdentifier)) {
-        return table;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * This method will add the prepare the max column cardinality map
-   *
-   * @param columnCardinalityMap
-   * @param currentBlockSchema
-   * @param currentBlockCardinality
-   */
-  public static void addColumnCardinalityToMap(Map<String, Integer> columnCardinalityMap,
-      List<ColumnSchema> currentBlockSchema, int[] currentBlockCardinality) {
-    for (int i = 0; i < currentBlockCardinality.length; i++) {
-      // add value to map only if does not exist or new cardinality is > existing value
-      String columnUniqueId = currentBlockSchema.get(i).getColumnUniqueId();
-      Integer value = columnCardinalityMap.get(columnUniqueId);
-      if (null == value) {
-        columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
-      } else {
-        if (currentBlockCardinality[i] > value) {
-          columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
-        }
-      }
-    }
-  }
-
-  /**
-   * This method will return the updated cardinality according to the master schema
-   *
-   * @param columnCardinalityMap
-   * @param carbonTable
-   * @param updatedColumnSchemaList
-   * @return
-   */
-  public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
-      CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
-    List<CarbonDimension> masterDimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
-    List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
-    for (CarbonDimension dimension : masterDimensions) {
-      Integer value = columnCardinalityMap.get(dimension.getColumnId());
-      if (null == value) {
-        updatedCardinalityList.add(getDimensionDefaultCardinality(dimension));
-      } else {
-        updatedCardinalityList.add(value);
-      }
-      updatedColumnSchemaList.add(dimension.getColumnSchema());
-    }
-    // add measures to the column schema list
-    List<CarbonMeasure> masterSchemaMeasures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
-    for (CarbonMeasure measure : masterSchemaMeasures) {
-      updatedColumnSchemaList.add(measure.getColumnSchema());
-    }
-    int[] updatedCardinality = ArrayUtils
-        .toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()]));
-    return updatedCardinality;
-  }
-
-  /**
-   * This method will return the default cardinality based on dimension type
-   *
-   * @param dimension
-   * @return
-   */
-  private static int getDimensionDefaultCardinality(CarbonDimension dimension) {
-    int cardinality = 0;
-    if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-      cardinality = Integer.MAX_VALUE;
-    } else if (dimension.hasEncoding(Encoding.DICTIONARY)) {
-      if (null != dimension.getDefaultValue()) {
-        cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1;
-      } else {
-        cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY;
-      }
-    } else {
-      cardinality = -1;
-    }
-    return cardinality;
-  }
-}