You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:51 UTC
[13/14] incubator-carbondata git commit: rebase
rebase
rebase
rename package
rebase
change package name
fix style
fix spark2
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/66ccd308
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/66ccd308
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/66ccd308
Branch: refs/heads/master
Commit: 66ccd308536d5e64f05fe57aa3a7a9003b4adf5a
Parents: 567fa51
Author: jackylk <ja...@huawei.com>
Authored: Tue Nov 29 17:42:06 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 30 13:19:54 2016 +0530
----------------------------------------------------------------------
.../examples/GenerateDictionaryExample.scala | 2 +-
integration/spark-common/pom.xml | 2 +-
.../MalformedCarbonCommandException.java | 83 ++
.../carbondata/spark/load/CarbonLoaderUtil.java | 973 ++++++++++++++++++
.../spark/load/DeleteLoadFolders.java | 259 +++++
.../spark/load/DeletedLoadMetadata.java | 53 +
.../spark/merger/CarbonCompactionExecutor.java | 233 +++++
.../spark/merger/CarbonCompactionUtil.java | 283 ++++++
.../spark/merger/CarbonDataMergerUtil.java | 695 +++++++++++++
.../spark/merger/CompactionCallable.java | 44 +
.../carbondata/spark/merger/CompactionType.java | 28 +
.../spark/merger/NodeBlockRelation.java | 60 ++
.../spark/merger/NodeMultiBlockRelation.java | 59 ++
.../spark/merger/RowResultMerger.java | 336 +++++++
.../carbondata/spark/merger/TableMeta.java | 42 +
.../spark/merger/TupleConversionAdapter.java | 85 ++
.../spark/partition/api/DataPartitioner.java | 54 +
.../spark/partition/api/Partition.java | 42 +
.../api/impl/DataPartitionerProperties.java | 87 ++
.../partition/api/impl/DefaultLoadBalancer.java | 69 ++
.../spark/partition/api/impl/PartitionImpl.java | 54 +
.../api/impl/PartitionMultiFileImpl.java | 51 +
.../api/impl/QueryPartitionHelper.java | 77 ++
.../api/impl/SampleDataPartitionerImpl.java | 151 +++
.../readsupport/SparkRowReadSupportImpl.java | 69 ++
.../carbondata/spark/splits/TableSplit.java | 129 +++
.../carbondata/spark/util/CarbonQueryUtil.java | 142 +++
.../carbondata/spark/util/LoadMetadataUtil.java | 61 ++
.../spark/CarbonAliasDecoderRelation.scala | 43 +
.../spark/CarbonColumnValidator.scala | 36 +
.../apache/carbondata/spark/CarbonFilters.scala | 391 ++++++++
.../apache/carbondata/spark/CarbonOption.scala | 48 +
.../carbondata/spark/CarbonSparkFactory.scala | 59 ++
.../spark/DictionaryDetailHelper.scala | 63 ++
.../org/apache/carbondata/spark/KeyVal.scala | 89 ++
.../carbondata/spark/csv/CarbonCsvReader.scala | 182 ++++
.../spark/csv/CarbonCsvRelation.scala | 249 +++++
.../carbondata/spark/csv/CarbonTextFile.scala | 91 ++
.../carbondata/spark/csv/DefaultSource.scala | 183 ++++
.../spark/rdd/CarbonCleanFilesRDD.scala | 82 ++
.../spark/rdd/CarbonDataLoadRDD.scala | 598 ++++++++++++
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 91 ++
.../spark/rdd/CarbonDeleteLoadRDD.scala | 84 ++
.../spark/rdd/CarbonDropTableRDD.scala | 71 ++
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 557 +++++++++++
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 342 +++++++
.../spark/rdd/CarbonSparkPartition.scala | 35 +
.../apache/carbondata/spark/rdd/Compactor.scala | 130 +++
.../spark/rdd/DataLoadCoalescedRDD.scala | 68 ++
.../spark/rdd/DataLoadPartitionCoalescer.scala | 363 +++++++
.../spark/tasks/DictionaryWriterTask.scala | 106 ++
.../spark/tasks/SortIndexWriterTask.scala | 59 ++
.../carbondata/spark/util/CarbonScalaUtil.scala | 195 ++++
.../carbondata/spark/util/CommonUtil.scala | 259 +++++
.../spark/util/DataTypeConverterUtil.scala | 74 ++
.../spark/util/GlobalDictionaryUtil.scala | 843 ++++++++++++++++
.../CarbonTableIdentifierImplicit.scala | 42 +
.../execution/command/carbonTableSchema.scala | 359 +++++++
.../spark/sql/hive/DistributionUtil.scala | 167 ++++
.../CarbonDecoderOptimizerHelper.scala | 149 +++
.../scala/org/apache/spark/util/FileUtils.scala | 94 ++
.../apache/spark/util/ScalaCompilerUtil.scala | 35 +
.../scala/org/apache/spark/util/SparkUtil.scala | 73 ++
.../spark/merger/CarbonCompactionExecutor.java | 233 -----
.../spark/merger/CarbonCompactionUtil.java | 284 ------
.../spark/merger/CompactionCallable.java | 44 -
.../spark/merger/CompactionType.java | 28 -
.../spark/merger/RowResultMerger.java | 336 -------
.../spark/merger/TupleConversionAdapter.java | 85 --
.../MalformedCarbonCommandException.java | 83 --
.../carbondata/spark/load/CarbonLoaderUtil.java | 976 -------------------
.../spark/load/DeleteLoadFolders.java | 259 -----
.../spark/load/DeleteLoadFromMetadata.java | 44 -
.../spark/load/DeletedLoadMetadata.java | 53 -
.../spark/merger/CarbonDataMergerUtil.java | 696 -------------
.../spark/merger/NodeBlockRelation.java | 60 --
.../spark/merger/NodeMultiBlockRelation.java | 59 --
.../spark/partition/api/DataPartitioner.java | 54 -
.../spark/partition/api/Partition.java | 42 -
.../api/impl/DataPartitionerProperties.java | 87 --
.../partition/api/impl/DefaultLoadBalancer.java | 69 --
.../spark/partition/api/impl/PartitionImpl.java | 54 -
.../api/impl/PartitionMultiFileImpl.java | 51 -
.../api/impl/QueryPartitionHelper.java | 77 --
.../api/impl/SampleDataPartitionerImpl.java | 151 ---
.../readsupport/SparkRowReadSupportImpl.java | 69 --
.../carbondata/spark/splits/TableSplit.java | 129 ---
.../carbondata/spark/util/CarbonQueryUtil.java | 142 ---
.../carbondata/spark/util/LoadMetadataUtil.java | 61 --
.../spark/CarbonColumnValidator.scala | 36 -
.../spark/CarbonDataFrameWriter.scala | 2 +-
.../apache/carbondata/spark/CarbonFilters.scala | 391 --------
.../apache/carbondata/spark/CarbonOption.scala | 46 -
.../carbondata/spark/CarbonSparkFactory.scala | 60 --
.../spark/DictionaryDetailHelper.scala | 62 --
.../org/apache/carbondata/spark/KeyVal.scala | 89 --
.../carbondata/spark/csv/CarbonCsvReader.scala | 182 ----
.../spark/csv/CarbonCsvRelation.scala | 248 -----
.../carbondata/spark/csv/CarbonTextFile.scala | 63 --
.../carbondata/spark/csv/DefaultSource.scala | 182 ----
.../spark/rdd/CarbonCleanFilesRDD.scala | 83 --
.../spark/rdd/CarbonDataLoadRDD.scala | 604 ------------
.../spark/rdd/CarbonDataRDDFactory.scala | 80 +-
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 92 --
.../spark/rdd/CarbonDeleteLoadRDD.scala | 84 --
.../spark/rdd/CarbonDropTableRDD.scala | 72 --
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 558 -----------
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 344 -------
.../carbondata/spark/rdd/CarbonScanRDD.scala | 8 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 133 ---
.../spark/rdd/NewCarbonDataLoadRDD.scala | 1 -
.../spark/tasks/DictionaryWriterTask.scala | 106 --
.../spark/tasks/SortIndexWriterTask.scala | 59 --
.../carbondata/spark/util/CarbonScalaUtil.scala | 219 -----
.../carbondata/spark/util/CommonUtil.scala | 251 -----
.../spark/util/DataTypeConverterUtil.scala | 55 --
.../spark/util/GlobalDictionaryUtil.scala | 875 -----------------
.../apache/spark/rdd/DataLoadCoalescedRDD.scala | 68 --
.../spark/rdd/DataLoadPartitionCoalescer.scala | 363 -------
.../spark/sql/CarbonCatalystOperators.scala | 8 +-
.../org/apache/spark/sql/CarbonContext.scala | 4 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 15 +-
.../spark/sql/CarbonDatasourceRelation.scala | 8 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 13 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 50 +-
.../scala/org/apache/spark/sql/CarbonScan.scala | 35 +-
.../org/apache/spark/sql/CarbonSparkUtil.scala | 45 +
.../org/apache/spark/sql/CarbonSqlParser.scala | 8 +-
.../CarbonTableIdentifierImplicit.scala | 42 -
.../execution/command/carbonTableSchema.scala | 502 ++--------
.../apache/spark/sql/hive/CarbonMetastore.scala | 562 +++++++++++
.../spark/sql/hive/CarbonMetastoreCatalog.scala | 576 -----------
.../spark/sql/hive/CarbonStrategies.scala | 19 +-
.../spark/sql/hive/DistributionUtil.scala | 145 ---
.../execution/command/CarbonHiveCommands.scala | 8 +-
.../CarbonDecoderOptimizerHelper.scala | 149 ---
.../spark/sql/optimizer/CarbonOptimizer.scala | 24 +-
.../scala/org/apache/spark/util/FileUtils.scala | 94 --
.../apache/spark/util/ScalaCompilerUtil.scala | 35 -
.../org/apache/spark/util/SplitUtils.scala | 67 --
.../TestDataLoadPartitionCoalescer.scala | 2 -
.../spark/util/AllDictionaryTestCase.scala | 2 +-
.../AutoHighCardinalityIdentifyTestCase.scala | 2 +-
.../util/ExternalColumnDictionaryTestCase.scala | 2 +-
...GlobalDictionaryUtilConcurrentTestCase.scala | 2 +-
.../util/GlobalDictionaryUtilTestCase.scala | 2 +-
.../scala/org/apache/spark/sql/CarbonScan.scala | 127 +++
.../execution/CarbonLateDecodeStrategy.scala | 130 +++
148 files changed, 11561 insertions(+), 11292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
index 2d7aed0..e8c437d 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
@@ -63,7 +63,7 @@ object GenerateDictionaryExample {
dictFolderPath: String) {
val dataBaseName = carbonTableIdentifier.getDatabaseName
val tableName = carbonTableIdentifier.getTableName
- val carbonRelation = CarbonEnv.getInstance(cc).carbonCatalog.
+ val carbonRelation = CarbonEnv.get.carbonMetastore.
lookupRelation1(Option(dataBaseName),
tableName) (cc).asInstanceOf[CarbonRelation]
val carbonTable = carbonRelation.tableMeta.carbonTable
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index ad312f3..b0ab3ef 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -77,7 +77,7 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_${scala.binary.version}</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
new file mode 100644
index 0000000..de7d4a2
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.exception;
+
+
+import java.util.Locale;
+
+public class MalformedCarbonCommandException extends Exception {
+
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public MalformedCarbonCommandException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public MalformedCarbonCommandException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override
+ public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
new file mode 100644
index 0000000..492ceee
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -0,0 +1,973 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.load;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
+import org.apache.carbondata.lcm.locks.ICarbonLock;
+import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
+import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
+import org.apache.carbondata.processing.csvload.DataGraphExecuter;
+import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
+import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
+import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.spark.merger.NodeBlockRelation;
+import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
+
+import com.google.gson.Gson;
+import org.apache.spark.SparkConf;
+import org.apache.spark.util.Utils;
+
+
+public final class CarbonLoaderUtil {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+ /**
+ * minimum no of blocklet required for distribution
+ */
+ private static int minBlockLetsReqForDistribution = 0;
+
+ static {
+ String property = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE);
+ try {
+ minBlockLetsReqForDistribution = Integer.parseInt(property);
+ } catch (NumberFormatException ne) {
+ LOGGER.info("Invalid configuration. Consisering the defaul");
+ minBlockLetsReqForDistribution =
+ CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE;
+ }
+ }
+
+ private CarbonLoaderUtil() {
+
+ }
+
+ private static void generateGraph(IDataProcessStatus dataProcessTaskStatus, SchemaInfo info,
+ CarbonLoadModel loadModel, String outputLocation)
+ throws GraphGeneratorException {
+ DataLoadModel model = new DataLoadModel();
+ model.setCsvLoad(null != dataProcessTaskStatus.getCsvFilePath()
+ || null != dataProcessTaskStatus.getFilesToProcess());
+ model.setSchemaInfo(info);
+ model.setTableName(dataProcessTaskStatus.getTableName());
+ List<LoadMetadataDetails> loadMetadataDetails = loadModel.getLoadMetadataDetails();
+ if (null != loadMetadataDetails && !loadMetadataDetails.isEmpty()) {
+ model.setLoadNames(
+ CarbonDataProcessorUtil.getLoadNameFromLoadMetaDataDetails(loadMetadataDetails));
+ model.setModificationOrDeletionTime(CarbonDataProcessorUtil
+ .getModificationOrDeletionTimesFromLoadMetadataDetails(loadMetadataDetails));
+ }
+ model.setBlocksID(dataProcessTaskStatus.getBlocksID());
+ model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter());
+ model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter());
+ model.setCommentCharacter(dataProcessTaskStatus.getCommentCharacter());
+ model.setRddIteratorKey(dataProcessTaskStatus.getRddIteratorKey());
+ model.setTaskNo(loadModel.getTaskNo());
+ model.setFactTimeStamp(loadModel.getFactTimeStamp());
+ model.setMaxColumns(loadModel.getMaxColumns());
+ model.setDateFormat(loadModel.getDateFormat());
+ boolean hdfsReadMode =
+ dataProcessTaskStatus.getCsvFilePath() != null
+ && dataProcessTaskStatus.getCsvFilePath().startsWith("hdfs:");
+ int allocate =
+ null != dataProcessTaskStatus.getCsvFilePath()
+ ? 1 : dataProcessTaskStatus.getFilesToProcess().size();
+ GraphGenerator generator = new GraphGenerator(model, hdfsReadMode, loadModel.getPartitionId(),
+ loadModel.getStorePath(), allocate,
+ loadModel.getCarbonDataLoadSchema(), loadModel.getSegmentId(), outputLocation);
+ generator.generateGraph();
+ }
+
+ public static void executeGraph(CarbonLoadModel loadModel, String storeLocation,
+ String storePath, String kettleHomePath) throws Exception {
+ System.setProperty("KETTLE_HOME", kettleHomePath);
+ if (!new File(storeLocation).mkdirs()) {
+ LOGGER.error("Error while creating the temp store path: " + storeLocation);
+ }
+ String outPutLoc = storeLocation + "/etl";
+ String databaseName = loadModel.getDatabaseName();
+ String tableName = loadModel.getTableName();
+ String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+ + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, storePath);
+ // CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
+ CarbonProperties.getInstance().addProperty("send.signal.load", "false");
+
+ String fileNamePrefix = "";
+ if (loadModel.isAggLoadRequest()) {
+ fileNamePrefix = "graphgenerator";
+ }
+ String graphPath =
+ outPutLoc + File.separator + databaseName + File.separator + tableName + File.separator
+ + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo() + File.separator
+ + tableName + fileNamePrefix + ".ktr";
+ File path = new File(graphPath);
+ if (path.exists()) {
+ path.delete();
+ }
+
+ DataProcessTaskStatus dataProcessTaskStatus
+ = new DataProcessTaskStatus(databaseName, tableName);
+ dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
+ dataProcessTaskStatus.setDimCSVDirLoc(loadModel.getDimFolderPath());
+ if (loadModel.isDirectLoad()) {
+ dataProcessTaskStatus.setFilesToProcess(loadModel.getFactFilesToProcess());
+ dataProcessTaskStatus.setDirectLoad(true);
+ dataProcessTaskStatus.setCsvDelimiter(loadModel.getCsvDelimiter());
+ dataProcessTaskStatus.setCsvHeader(loadModel.getCsvHeader());
+ }
+
+ dataProcessTaskStatus.setBlocksID(loadModel.getBlocksID());
+ dataProcessTaskStatus.setEscapeCharacter(loadModel.getEscapeChar());
+ dataProcessTaskStatus.setQuoteCharacter(loadModel.getQuoteChar());
+ dataProcessTaskStatus.setCommentCharacter(loadModel.getCommentChar());
+ dataProcessTaskStatus.setRddIteratorKey(loadModel.getRddIteratorKey());
+ dataProcessTaskStatus.setDateFormat(loadModel.getDateFormat());
+ SchemaInfo info = new SchemaInfo();
+
+ info.setDatabaseName(databaseName);
+ info.setTableName(tableName);
+ info.setAutoAggregateRequest(loadModel.isAggLoadRequest());
+ info.setComplexDelimiterLevel1(loadModel.getComplexDelimiterLevel1());
+ info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
+ info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
+ info.setBadRecordsLoggerEnable(loadModel.getBadRecordsLoggerEnable());
+ info.setBadRecordsLoggerAction(loadModel.getBadRecordsAction());
+
+ generateGraph(dataProcessTaskStatus, info, loadModel, outPutLoc);
+
+ DataGraphExecuter graphExecuter = new DataGraphExecuter(dataProcessTaskStatus);
+ graphExecuter
+ .executeGraph(graphPath, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN),
+ info, loadModel.getPartitionId(), loadModel.getCarbonDataLoadSchema());
+ }
+
+ public static List<String> addNewSliceNameToList(String newSlice, List<String> activeSlices) {
+ activeSlices.add(newSlice);
+ return activeSlices;
+ }
+
+ public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+ for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
+ deleteStorePath(segmentPath);
+ }
+ }
+
+ public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
+ final boolean isCompactionFlow) throws IOException {
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+ String metaDataLocation = carbonTable.getMetaDataFilepath();
+ final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+ //delete folder which metadata no exist in tablestatus
+ for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+ final String partitionCount = i + "";
+ String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+ FileType fileType = FileFactory.getFileType(partitionPath);
+ if (FileFactory.isFileExist(partitionPath, fileType)) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile path) {
+ String segmentId =
+ CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+ boolean found = false;
+ for (int j = 0; j < details.length; j++) {
+ if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
+ .equals(partitionCount)) {
+ found = true;
+ break;
+ }
+ }
+ return !found;
+ }
+ });
+ for (int k = 0; k < listFiles.length; k++) {
+ String segmentId =
+ CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+ if (isCompactionFlow) {
+ if (segmentId.contains(".")) {
+ deleteStorePath(listFiles[k].getAbsolutePath());
+ }
+ } else {
+ if (!segmentId.contains(".")) {
+ deleteStorePath(listFiles[k].getAbsolutePath());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static void deleteStorePath(String path) {
+ try {
+ FileType fileType = FileFactory.getFileType(path);
+ if (FileFactory.isFileExist(path, fileType)) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+ CarbonUtil.deleteFoldersAndFiles(carbonFile);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Unable to delete the given path :: " + e.getMessage());
+ } catch (CarbonUtilException e) {
+ LOGGER.error("Unable to delete the given path :: " + e.getMessage());
+ }
+ }
+
+ public static List<String> getListOfValidSlices(LoadMetadataDetails[] details) {
+ List<String> activeSlices =
+ new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (LoadMetadataDetails oneLoad : details) {
+ if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS.equals(oneLoad.getLoadStatus())
+ || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(oneLoad.getLoadStatus())
+ || CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
+ if (null != oneLoad.getMergedLoadName()) {
+ String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getMergedLoadName();
+ activeSlices.add(loadName);
+ } else {
+ String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
+ activeSlices.add(loadName);
+ }
+ }
+ }
+ return activeSlices;
+ }
+
+ public static List<String> getListOfUpdatedSlices(LoadMetadataDetails[] details) {
+ List<String> updatedSlices =
+ new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (LoadMetadataDetails oneLoad : details) {
+ if (CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
+ if (null != oneLoad.getMergedLoadName()) {
+ updatedSlices.add(oneLoad.getMergedLoadName());
+ } else {
+ updatedSlices.add(oneLoad.getLoadName());
+ }
+ }
+ }
+ return updatedSlices;
+ }
+
+ public static void removeSliceFromMemory(String databaseName, String tableName, String loadName) {
+ // TODO: Remove from memory
+ }
+
+ /**
+ * This method will delete the local data load folder location after data load is complete
+ *
+ * @param loadModel
+ */
+ public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
+ boolean isCompactionFlow) {
+ String databaseName = loadModel.getDatabaseName();
+ String tableName = loadModel.getTableName();
+ String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+ + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+ if (isCompactionFlow) {
+ tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
+ }
+ // form local store location
+ String localStoreLocation = CarbonProperties.getInstance()
+ .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+ try {
+ CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile() });
+ LOGGER.info("Deleted the local store location" + localStoreLocation);
+ } catch (CarbonUtilException e) {
+ LOGGER.error(e, "Failed to delete local data load folder location");
+ }
+
+ }
+
+ /**
+ * This method will get the store location for the given path, segemnt id and partition id
+ *
+ * @param storePath
+ * @param carbonTableIdentifier
+ * @param segmentId
+ * @param partitionId
+ * @return
+ */
+ public static String getStoreLocation(String storePath,
+ CarbonTableIdentifier carbonTableIdentifier, String segmentId, String partitionId) {
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
+ String carbonDataFilePath = carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
+ return carbonDataFilePath;
+ }
+
+ /**
+ * This API will write the load level metadata for the loadmanagement module inorder to
+ * manage the load and query execution management smoothly.
+ *
+ * @param loadCount
+ * @param loadMetadataDetails
+ * @param loadModel
+ * @param loadStatus
+ * @param startLoadTime
+ * @return boolean which determines whether status update is done or not.
+ * @throws IOException
+ */
+ public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
+ CarbonLoadModel loadModel, String loadStatus, String startLoadTime) throws IOException {
+
+ boolean status = false;
+
+ String metaDataFilepath =
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
+
+ String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
+
+ try {
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info(
+ "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+ + " for table status updation");
+
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+ String loadEnddate = readCurrentTime();
+ loadMetadataDetails.setTimestamp(loadEnddate);
+ loadMetadataDetails.setLoadStatus(loadStatus);
+ loadMetadataDetails.setLoadName(String.valueOf(loadCount));
+ loadMetadataDetails.setLoadStartTime(startLoadTime);
+
+ List<LoadMetadataDetails> listOfLoadFolderDetails =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ if (null != listOfLoadFolderDetailsArray) {
+ for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+ listOfLoadFolderDetails.add(loadMetadata);
+ }
+ }
+ listOfLoadFolderDetails.add(loadMetadataDetails);
+
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+ .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+
+ status = true;
+ } else {
+ LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
+ .getDatabaseName() + "." + loadModel.getTableName());
+ }
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info(
+ "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
+ + "." + loadModel.getTableName());
+ } else {
+ LOGGER.error(
+ "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
+ .getTableName() + " during table status updation");
+ }
+ }
+ return status;
+ }
+
+ public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
+ String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(schema.getCarbonTable().getStorePath(),
+ schema.getCarbonTable().getCarbonTableIdentifier());
+ String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+
+ DataOutputStream dataOutputStream;
+ Gson gsonObjectToWrite = new Gson();
+ BufferedWriter brWriter = null;
+
+ AtomicFileOperations writeOperation =
+ new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+ try {
+
+ dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+ brWriter.write(metadataInstance);
+ } finally {
+ try {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ } catch (Exception e) {
+ LOGGER.error("error in flushing ");
+
+ }
+ CarbonUtil.closeStreams(brWriter);
+ writeOperation.close();
+ }
+
+ }
+
+ public static String readCurrentTime() {
+ SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+ String date = null;
+
+ date = sdf.format(new Date());
+
+ return date;
+ }
+
+ public static String extractLoadMetadataFileLocation(CarbonLoadModel loadModel) {
+ CarbonTable carbonTable =
+ org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+ .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+ return carbonTable.getMetaDataFilepath();
+ }
+
+ public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
+ String carbonStorePath) throws CarbonUtilException {
+ Cache dictCache =
+ CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
+ return (Dictionary) dictCache.get(columnIdentifier);
+ }
+
+ public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
+ ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
+ throws CarbonUtilException {
+ return getDictionary(
+ new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType),
+ carbonStorePath);
+ }
+
+ /**
+ * This method will divide the blocks among the tasks of the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @param noOfNodesInput -1 if number of nodes has to be decided
+ * based on block location information
+ * @param parallelism total no of tasks to execute in parallel
+ * @return
+ */
+ public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
+ List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
+ List<String> activeNode) {
+
+ Map<String, List<Distributable>> mapOfNodes =
+ CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+ int taskPerNode = parallelism / mapOfNodes.size();
+ //assigning non zero value to noOfTasksPerNode
+ int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
+ // divide the blocks of a node among the tasks of the node.
+ return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
+ }
+
+ /**
+ * This method will divide the blocks among the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @return
+ */
+ public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+ int noOfNodesInput) {
+ return nodeBlockMapping(blockInfos, noOfNodesInput, null);
+ }
+
+ /**
+ * This method will divide the blocks among the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @return
+ */
+ public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
+ // -1 if number of nodes has to be decided based on block location information
+ return nodeBlockMapping(blockInfos, -1);
+ }
+
+ /**
+ * the method returns the number of required executors
+ *
+ * @param blockInfos
+ * @return
+ */
+ public static Map<String, List<Distributable>> getRequiredExecutors(
+ List<Distributable> blockInfos) {
+ List<NodeBlockRelation> flattenedList =
+ new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (Distributable blockInfo : blockInfos) {
+ try {
+ for (String eachNode : blockInfo.getLocations()) {
+ NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+ flattenedList.add(nbr);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+ }
+ }
+ // sort the flattened data.
+ Collections.sort(flattenedList);
+ Map<String, List<Distributable>> nodeAndBlockMapping =
+ new LinkedHashMap<String, List<Distributable>>(
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ // from the flattened list create a mapping of node vs Data blocks.
+ createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+ return nodeAndBlockMapping;
+ }
+
+ /**
+ * This method will divide the blocks among the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @param noOfNodesInput -1 if number of nodes has to be decided
+ * based on block location information
+ * @return
+ */
+ public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+ int noOfNodesInput, List<String> activeNodes) {
+
+ Map<String, List<Distributable>> nodeBlocksMap =
+ new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ List<NodeBlockRelation> flattenedList =
+ new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ Set<Distributable> uniqueBlocks =
+ new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
+
+ int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
+ if (null != activeNodes) {
+ noofNodes = activeNodes.size();
+ }
+ int blocksPerNode = blockInfos.size() / noofNodes;
+ blocksPerNode = blocksPerNode <=0 ? 1 : blocksPerNode;
+
+ // sort the flattened data.
+ Collections.sort(flattenedList);
+
+ Map<String, List<Distributable>> nodeAndBlockMapping =
+ new LinkedHashMap<String, List<Distributable>>(
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ // from the flattened list create a mapping of node vs Data blocks.
+ createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+
+ // so now we have a map of node vs blocks. allocate the block as per the order
+ createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
+
+ // if any blocks remain then assign them to nodes in round robin.
+ assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
+
+ return nodeBlocksMap;
+ }
+
+ /**
+ * Assigning the blocks of a node to tasks.
+ *
+ * @param nodeBlocksMap nodeName to list of blocks mapping
+ * @param noOfTasksPerNode
+ * @return
+ */
+ private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
+ Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
+ Map<String, List<List<Distributable>>> outputMap =
+ new HashMap<String, List<List<Distributable>>>(
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ // for each node
+ for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
+
+ List<Distributable> blockOfEachNode = eachNode.getValue();
+ //sorting the block so same block will be give to same executor
+ Collections.sort(blockOfEachNode);
+ // create the task list for each node.
+ createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
+
+ // take all the block of node and divide it among the tasks of a node.
+ divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
+ }
+
+ return outputMap;
+ }
+
+ /**
+ * This will divide the blocks of a node to tasks of the node.
+ *
+ * @param outputMap
+ * @param key
+ * @param blockOfEachNode
+ */
+ private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
+ String key, List<Distributable> blockOfEachNode) {
+
+ List<List<Distributable>> taskLists = outputMap.get(key);
+ int tasksOfNode = taskLists.size();
+ int i = 0;
+ for (Distributable block : blockOfEachNode) {
+
+ taskLists.get(i % tasksOfNode).add(block);
+ i++;
+ }
+
+ }
+
+ /**
+ * This will create the empty list for each task of a node.
+ *
+ * @param outputMap
+ * @param noOfTasksPerNode
+ * @param key
+ */
+ private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
+ int noOfTasksPerNode, String key) {
+ List<List<Distributable>> nodeTaskList =
+ new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (int i = 0; i < noOfTasksPerNode; i++) {
+ List<Distributable> eachTask =
+ new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ nodeTaskList.add(eachTask);
+
+ }
+ outputMap.put(key, nodeTaskList);
+
+ }
+
+ /**
+ * If any left over data blocks are present then assign those to nodes in round robin way.
+ *
+ * @param outputMap
+ * @param uniqueBlocks
+ */
+ private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
+ Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
+
+ if (activeNodes != null) {
+ for (String activeNode : activeNodes) {
+ List<Distributable> blockLst = outputMap.get(activeNode);
+ if (null == blockLst) {
+ blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+ populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+ if (blockLst.size() > 0) {
+ outputMap.put(activeNode, blockLst);
+ }
+ }
+ } else {
+ for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+ List<Distributable> blockLst = entry.getValue();
+ populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+ }
+
+ }
+
+ for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+ Iterator<Distributable> blocks = uniqueBlocks.iterator();
+ if (blocks.hasNext()) {
+ Distributable block = blocks.next();
+ List<Distributable> blockLst = entry.getValue();
+ blockLst.add(block);
+ blocks.remove();
+ }
+ }
+ }
+
+ /**
+ * The method populate the blockLst to be allocate to a specific node.
+ * @param uniqueBlocks
+ * @param noOfBlocksPerNode
+ * @param blockLst
+ */
+ private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
+ List<Distributable> blockLst) {
+ Iterator<Distributable> blocks = uniqueBlocks.iterator();
+ //if the node is already having the per block nodes then avoid assign the extra blocks
+ if (blockLst.size() == noOfBlocksPerNode) {
+ return;
+ }
+ while (blocks.hasNext()) {
+ Distributable block = blocks.next();
+ blockLst.add(block);
+ blocks.remove();
+ if (blockLst.size() >= noOfBlocksPerNode) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * To create the final output of the Node and Data blocks
+ *
+ * @param outputMap
+ * @param blocksPerNode
+ * @param uniqueBlocks
+ * @param nodeAndBlockMapping
+ * @param activeNodes
+ */
+ private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
+ Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
+ List<String> activeNodes) {
+
+ ArrayList<NodeMultiBlockRelation> multiBlockRelations =
+ new ArrayList<>(nodeAndBlockMapping.size());
+ for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
+ multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
+ }
+ // sort nodes based on number of blocks per node, so that nodes having lesser blocks
+ // are assigned first
+ Collections.sort(multiBlockRelations);
+
+ for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+ String nodeName = nodeMultiBlockRelation.getNode();
+ //assign the block to the node only if the node is active
+ String activeExecutor = nodeName;
+ if (null != activeNodes) {
+ activeExecutor = getActiveExecutor(activeNodes, nodeName);
+ if (null == activeExecutor) {
+ continue;
+ }
+ }
+ // this loop will be for each NODE
+ int nodeCapacity = 0;
+ // loop thru blocks of each Node
+ for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
+
+ // check if this is already assigned.
+ if (uniqueBlocks.contains(block)) {
+
+ if (null == outputMap.get(activeExecutor)) {
+ List<Distributable> list =
+ new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ outputMap.put(activeExecutor, list);
+ }
+ // assign this block to this node if node has capacity left
+ if (nodeCapacity < blocksPerNode) {
+ List<Distributable> infos = outputMap.get(activeExecutor);
+ infos.add(block);
+ nodeCapacity++;
+ uniqueBlocks.remove(block);
+ } else {
+ // No need to continue loop as node is full
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * method validates whether the node is active or not.
+ *
+ * @param activeNode
+ * @param nodeName
+ * @return returns true if active else false.
+ */
+ private static String getActiveExecutor(List activeNode, String nodeName) {
+ boolean isActiveNode = activeNode.contains(nodeName);
+ if (isActiveNode) {
+ return nodeName;
+ }
+ //if localhost then retrieve the localhost name then do the check
+ else if (nodeName.equals("localhost")) {
+ try {
+ String hostName = InetAddress.getLocalHost().getHostName();
+ isActiveNode = activeNode.contains(hostName);
+ if(isActiveNode){
+ return hostName;
+ }
+ } catch (UnknownHostException ue) {
+ isActiveNode = false;
+ }
+ } else {
+ try {
+ String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
+ isActiveNode = activeNode.contains(hostAddress);
+ if(isActiveNode){
+ return hostAddress;
+ }
+ } catch (UnknownHostException ue) {
+ isActiveNode = false;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Create the Node and its related blocks Mapping and put in a Map
+ *
+ * @param flattenedList
+ * @param nodeAndBlockMapping
+ */
+ private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
+ Map<String, List<Distributable>> nodeAndBlockMapping) {
+ for (NodeBlockRelation nbr : flattenedList) {
+ String node = nbr.getNode();
+ List<Distributable> list;
+
+ if (null == nodeAndBlockMapping.get(node)) {
+ list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ list.add(nbr.getBlock());
+ Collections.sort(list);
+ nodeAndBlockMapping.put(node, list);
+ } else {
+ list = nodeAndBlockMapping.get(node);
+ list.add(nbr.getBlock());
+ Collections.sort(list);
+ }
+ }
+ }
+
+ /**
+ * Create the flat List i.e flattening of the Map.
+ *
+ * @param blockInfos
+ * @param flattenedList
+ * @param uniqueBlocks
+ */
+ private static void createFlattenedListFromMap(List<Distributable> blockInfos,
+ List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
+ Set<String> nodeList) {
+ for (Distributable blockInfo : blockInfos) {
+ // put the blocks in the set
+ uniqueBlocks.add(blockInfo);
+
+ try {
+ for (String eachNode : blockInfo.getLocations()) {
+ NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+ flattenedList.add(nbr);
+ nodeList.add(eachNode);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+ }
+ }
+ }
+
+ /**
+ * This method will get the store location for the given path, segment id and partition id
+ *
+ * @param carbonStorePath
+ * @param dbName
+ * @param tableName
+ * @param segmentId
+ */
+ public static void checkAndCreateCarbonDataLocation(String carbonStorePath, String dbName,
+ String tableName, String segmentId) {
+ CarbonTable carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(dbName + CarbonCommonConstants.UNDERSCORE + tableName);
+ CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
+ String carbonDataDirectoryPath =
+ carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+ }
+
+ /**
+ * return the Array of available local-dirs
+ *
+ * @param conf
+ * @return
+ */
+ public static String[] getConfiguredLocalDirs(SparkConf conf) {
+ return Utils.getConfiguredLocalDirs(conf);
+ }
+
+ /**
+ * This will update the old table status details before clean files to the latest table status.
+ * @param oldList
+ * @param newList
+ * @return
+ */
+ public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+ LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
+
+ List<LoadMetadataDetails> newListMetadata =
+ new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
+ for (LoadMetadataDetails oldSegment : oldList) {
+ if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
+ newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
+ }
+ }
+ return newListMetadata;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
new file mode 100644
index 0000000..2b3979f
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Project Name : Carbon
+ * Module Name : CARBON spark interface
+ * Author : R00903928
+ * Created Date : 22-Sep-2015
+ * FileName : DeleteLoadFolders.java
+ * Description : for physical deletion of load folders.
+ * Class Version : 1.0
+ */
+package org.apache.carbondata.spark.load;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+
+public final class DeleteLoadFolders {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
+
+ private DeleteLoadFolders() {
+
+ }
+
+ /**
+ * returns segment path
+ *
+ * @param loadModel
+ * @param storeLocation
+ * @param partitionId
+ * @param oneLoad
+ * @return
+ */
+ private static String getSegmentPath(CarbonLoadModel loadModel, String storeLocation,
+ int partitionId, LoadMetadataDetails oneLoad) {
+
+ String path = null;
+ String segmentId = oneLoad.getLoadName();
+
+ path = new CarbonStorePath(storeLocation).getCarbonTablePath(
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())
+ .getCarbonDataDirectoryPath("" + partitionId, segmentId);
+ return path;
+ }
+
+ private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
+
+ boolean status = false;
+ try {
+ if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+ CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+ CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile file) {
+ return (CarbonTablePath.isCarbonDataFile(file.getName())
+ || CarbonTablePath.isCarbonIndexFile(file.getName()));
+ }
+ });
+
+ //if there are no fact and msr metadata files present then no need to keep
+ //entry in metadata.
+ if (filesToBeDeleted.length == 0) {
+ status = true;
+ } else {
+
+ for (CarbonFile eachFile : filesToBeDeleted) {
+ if (!eachFile.delete()) {
+ LOGGER.warn("Unable to delete the file as per delete command "
+ + eachFile.getAbsolutePath());
+ status = false;
+ } else {
+ status = true;
+ }
+ }
+ }
+ // need to delete the complete folder.
+ if(status){
+ if(!file.delete()){
+ LOGGER.warn("Unable to delete the folder as per delete command "
+ + file.getAbsolutePath());
+ status = false;
+ }
+ }
+
+ } else {
+ status = false;
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Unable to delete the file as per delete command " + path);
+ }
+
+ return status;
+
+ }
+
+ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
+ boolean isForceDelete) {
+ if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
+ || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
+ && oneLoad.getVisibility().equalsIgnoreCase("true")) {
+ if (isForceDelete) {
+ return true;
+ }
+ String deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+ SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+ Date deletionDate = null;
+ String date = null;
+ Date currentTimeStamp = null;
+ try {
+ deletionDate = parser.parse(deletionTime);
+ date = CarbonLoaderUtil.readCurrentTime();
+ currentTimeStamp = parser.parse(date);
+ } catch (ParseException e) {
+ return false;
+ }
+
+ long difference = currentTimeStamp.getTime() - deletionDate.getTime();
+
+ long minutesElapsed = (difference / (1000 * 60));
+
+ int maxTime;
+ try {
+ maxTime = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+ } catch (NumberFormatException e) {
+ maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
+ }
+ if (minutesElapsed > maxTime) {
+ return true;
+ }
+
+ }
+
+ return false;
+ }
+
+ private static void factFileRenaming(String loadFolderPath) {
+
+ FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
+ try {
+ if (FileFactory.isFileExist(loadFolderPath, fileType)) {
+ CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
+
+ CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile file) {
+ return (file.getName().endsWith('_' + CarbonCommonConstants.FACT_FILE_UPDATED));
+ }
+ });
+
+ for (CarbonFile file : listFiles) {
+ if (!file.renameTo(file.getName().substring(0,
+ file.getName().length() - CarbonCommonConstants.FACT_FILE_UPDATED.length()))) {
+ LOGGER.warn("could not rename the updated fact file.");
+ }
+ }
+
+ }
+ } catch (IOException e) {
+ LOGGER.error("exception" + e.getMessage());
+ }
+
+ }
+
+ private static void cleanDeletedFactFile(String loadFolderPath) {
+ FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
+ try {
+ if (FileFactory.isFileExist(loadFolderPath, fileType)) {
+ CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
+
+ CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile file) {
+ return (file.getName().endsWith(CarbonCommonConstants.FACT_DELETE_EXTENSION));
+ }
+ });
+
+ for (CarbonFile file : listFiles) {
+ if (!file.delete()) {
+ LOGGER.warn("could not delete the marked fact file.");
+ }
+ }
+
+ }
+ } catch (IOException e) {
+ LOGGER.error("exception" + e.getMessage());
+ }
+ }
+
+ /**
+ * @param loadModel
+ * @param storeLocation
+ * @param isForceDelete
+ * @param details
+ * @return
+ *
+ */
+ public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
+ String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
+ List<LoadMetadataDetails> deletedLoads =
+ new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ boolean isDeleted = false;
+
+ if (details != null && details.length != 0) {
+ for (LoadMetadataDetails oneLoad : details) {
+ if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+ String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
+ boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
+ if (deletionStatus) {
+ isDeleted = true;
+ oneLoad.setVisibility("false");
+ deletedLoads.add(oneLoad);
+ LOGGER.info("Info: " +
+ " Deleted the load " + oneLoad.getLoadName());
+ }
+ }
+ }
+ }
+
+ return isDeleted;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
new file mode 100644
index 0000000..661e17c
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.load;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public class DeletedLoadMetadata implements Serializable {
+
+ private static final long serialVersionUID = 7083059404172117208L;
+ private Map<String, String> deletedLoadMetadataMap =
+ new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ public void addDeletedLoadMetadata(String loadId, String status) {
+ deletedLoadMetadataMap.put(loadId, status);
+ }
+
+ public List<String> getDeletedLoadMetadataIds() {
+ return new ArrayList<String>(deletedLoadMetadataMap.keySet());
+ }
+
+ public String getDeletedLoadMetadataStatus(String loadId) {
+ if (deletedLoadMetadataMap.containsKey(loadId)) {
+ return deletedLoadMetadataMap.get(loadId);
+ } else {
+ return null;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
new file mode 100644
index 0000000..9fa63d6
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.executor.QueryExecutor;
+import org.apache.carbondata.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.BatchResult;
+import org.apache.carbondata.scan.result.iterator.RawResultIterator;
+
+/**
+ * Executor class for executing the query on the selected segments to be merged.
+ * This will fire a select * query and get the raw result.
+ */
+public class CarbonCompactionExecutor {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+ private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
+ private final SegmentProperties destinationSegProperties;
+ private final String databaseName;
+ private final String factTableName;
+ private final Map<String, TaskBlockInfo> segmentMapping;
+ private final String storePath;
+ private QueryExecutor queryExecutor;
+ private CarbonTable carbonTable;
+ private QueryModel queryModel;
+
+ /**
+ * Constructor
+ *
+ * @param segmentMapping
+ * @param segmentProperties
+ * @param databaseName
+ * @param factTableName
+ * @param storePath
+ * @param carbonTable
+ */
+ public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
+ SegmentProperties segmentProperties, String databaseName, String factTableName,
+ String storePath, CarbonTable carbonTable,
+ Map<String, List<DataFileFooter>> dataFileMetadataSegMapping) {
+
+ this.segmentMapping = segmentMapping;
+
+ this.destinationSegProperties = segmentProperties;
+
+ this.databaseName = databaseName;
+
+ this.factTableName = factTableName;
+
+ this.storePath = storePath;
+
+ this.carbonTable = carbonTable;
+
+ this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
+ }
+
+ /**
+ * For processing of the table blocks.
+ *
+ * @return List of Carbon iterators
+ */
+ public List<RawResultIterator> processTableBlocks() throws QueryExecutionException {
+
+ List<RawResultIterator> resultList =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ List<TableBlockInfo> list = null;
+ queryModel = prepareQueryModel(list);
+ // iterate each seg ID
+ for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+ String segmentId = taskMap.getKey();
+ List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
+
+ int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
+
+ SegmentProperties sourceSegProperties =
+ new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
+
+ // for each segment get taskblock info
+ TaskBlockInfo taskBlockInfo = taskMap.getValue();
+ Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+
+ for (String task : taskBlockListMapping) {
+
+ list = taskBlockInfo.getTableBlockInfoList(task);
+ Collections.sort(list);
+ LOGGER.info("for task -" + task + "-block size is -" + list.size());
+ queryModel.setTableBlockInfos(list);
+ resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
+ destinationSegProperties));
+
+ }
+ }
+
+ return resultList;
+ }
+
+ /**
+ * get executor and execute the query model.
+ *
+ * @param blockList
+ * @return
+ */
+ private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+ throws QueryExecutionException {
+
+ queryModel.setTableBlockInfos(blockList);
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
+ CarbonIterator<BatchResult> iter = null;
+ try {
+ iter = queryExecutor.execute(queryModel);
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e.getMessage());
+ throw e;
+ }
+
+ return iter;
+ }
+
+ /**
+ * Below method will be used
+ * for cleanup
+ */
+ public void finish() {
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e, "Problem while finish: ");
+ }
+ clearDictionaryFromQueryModel();
+ }
+
+ /**
+ * This method will clear the dictionary access count after its usage is complete so
+ * that column can be deleted form LRU cache whenever memory reaches threshold
+ */
+ private void clearDictionaryFromQueryModel() {
+ if (null != queryModel) {
+ Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+ if (null != columnToDictionaryMapping) {
+ for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+ CarbonUtil.clearDictionaryCache(entry.getValue());
+ }
+ }
+ }
+ }
+
+ /**
+ * Preparing of the query model.
+ *
+ * @param blockList
+ * @return
+ */
+ public QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+
+ QueryModel model = new QueryModel();
+
+ model.setTableBlockInfos(blockList);
+ model.setCountStarQuery(false);
+ model.setDetailQuery(true);
+ model.setForcedDetailRawQuery(true);
+ model.setFilterExpressionResolverTree(null);
+
+ List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ for (CarbonDimension dim : destinationSegProperties.getDimensions()) {
+ QueryDimension queryDimension = new QueryDimension(dim.getColName());
+ dims.add(queryDimension);
+ }
+ model.setQueryDimension(dims);
+
+ List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (CarbonMeasure carbonMeasure : destinationSegProperties.getMeasures()) {
+ QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+ msrs.add(queryMeasure);
+ }
+ model.setQueryMeasures(msrs);
+
+ model.setQueryId(System.nanoTime() + "");
+
+ model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
+
+ model.setAggTable(false);
+ model.setLimit(-1);
+
+ model.setTable(carbonTable);
+
+ model.setInMemoryRecordSize(CarbonCommonConstants.COMPACTION_INMEMORY_RECORD_SIZE);
+
+ return model;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
new file mode 100644
index 0000000..ca33fac
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+
+/**
+ * Utility Class for the Compaction Flow.
+ */
+public class CarbonCompactionUtil {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+
+ /**
+ * To create a mapping of Segment Id and TableBlockInfo.
+ *
+ * @param tableBlockInfoList
+ * @return
+ */
+ public static Map<String, TaskBlockInfo> createMappingForSegments(
+ List<TableBlockInfo> tableBlockInfoList) {
+
+ // stores taskBlockInfo of each segment
+ Map<String, TaskBlockInfo> segmentBlockInfoMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+
+ for (TableBlockInfo info : tableBlockInfoList) {
+ String segId = info.getSegmentId();
+ // check if segId is already present in map
+ TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId);
+ // extract task ID from file Path.
+ String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath());
+ // if taskBlockInfo is not there, then create and add
+ if (null == taskBlockInfoMapping) {
+ taskBlockInfoMapping = new TaskBlockInfo();
+ groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+ // put the taskBlockInfo with respective segment id
+ segmentBlockInfoMapping.put(segId, taskBlockInfoMapping);
+ } else
+ {
+ groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+ }
+ }
+ return segmentBlockInfoMapping;
+
+ }
+
+ /**
+ * Grouping the taskNumber and list of TableBlockInfo.
+ * @param info
+ * @param taskBlockMapping
+ * @param taskNo
+ */
+ private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info,
+ TaskBlockInfo taskBlockMapping, String taskNo) {
+ // get the corresponding list from task mapping.
+ List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo);
+ if (null != blockLists) {
+ blockLists.add(info);
+ } else {
+ blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ blockLists.add(info);
+ taskBlockMapping.addTableBlockInfoList(taskNo, blockLists);
+ }
+ }
+
+ /**
+ * To create a mapping of Segment Id and DataFileFooter.
+ *
+ * @param tableBlockInfoList
+ * @return
+ */
+ public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
+ List<TableBlockInfo> tableBlockInfoList) throws IndexBuilderException {
+
+ Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
+ for (TableBlockInfo blockInfo : tableBlockInfoList) {
+ List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
+ String segId = blockInfo.getSegmentId();
+
+ DataFileFooter dataFileMatadata = null;
+ // check if segId is already present in map
+ List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
+ try {
+ dataFileMatadata = CarbonUtil
+ .readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
+ blockInfo.getBlockLength());
+ } catch (CarbonUtilException e) {
+ throw new IndexBuilderException(e);
+ }
+ if (null == metadataList) {
+ // if it is not present
+ eachSegmentBlocks.add(dataFileMatadata);
+ segmentBlockInfoMapping.put(segId, eachSegmentBlocks);
+ } else {
+
+ // if its already present then update the list.
+ metadataList.add(dataFileMatadata);
+ }
+ }
+ return segmentBlockInfoMapping;
+
+ }
+
+ /**
+ * Check whether the file to indicate the compaction is present or not.
+ * @param metaFolderPath
+ * @return
+ */
+ public static boolean isCompactionRequiredForTable(String metaFolderPath) {
+ String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+
+ String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ try {
+ if (FileFactory.isFileExist(minorCompactionStatusFile,
+ FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
+ .isFileExist(majorCompactionStatusFile,
+ FileFactory.getFileType(majorCompactionStatusFile))) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage() );
+ }
+ return false;
+ }
+
+ /**
+ * Determine the type of the compaction received.
+ * @param metaFolderPath
+ * @return
+ */
+ public static CompactionType determineCompactionType(String metaFolderPath) {
+ String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+
+ String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ try {
+ if (FileFactory.isFileExist(minorCompactionStatusFile,
+ FileFactory.getFileType(minorCompactionStatusFile))) {
+ return CompactionType.MINOR_COMPACTION;
+ }
+ if (FileFactory.isFileExist(majorCompactionStatusFile,
+ FileFactory.getFileType(majorCompactionStatusFile))) {
+ return CompactionType.MAJOR_COMPACTION;
+ }
+
+ } catch (IOException e) {
+ LOGGER.error("Exception in determining the compaction request file " + e.getMessage() );
+ }
+ return CompactionType.MINOR_COMPACTION;
+ }
+
+ /**
+ * Delete the compation request file once the compaction is done.
+ * @param metaFolderPath
+ * @param compactionType
+ * @return
+ */
+ public static boolean deleteCompactionRequiredFile(String metaFolderPath,
+ CompactionType compactionType) {
+ String compactionRequiredFile;
+ if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+ compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+ } else {
+ compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ }
+ try {
+ if (FileFactory
+ .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) {
+ if (FileFactory
+ .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))
+ .delete()) {
+ LOGGER.info("Deleted the compaction request file " + compactionRequiredFile);
+ return true;
+ } else {
+ LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile);
+ }
+ } else {
+ LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception in deleting the compaction request file " + e.getMessage());
+ }
+ return false;
+ }
+
+ /**
+ * Creation of the compaction request if someother compaction is in progress.
+ * @param metaFolderPath
+ * @param compactionType
+ * @return
+ */
+ public static boolean createCompactionRequiredFile(String metaFolderPath,
+ CompactionType compactionType) {
+ String statusFile;
+ if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+ statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.minorCompactionRequiredFile;
+ } else {
+ statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonCommonConstants.majorCompactionRequiredFile;
+ }
+ try {
+ if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
+ if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
+ LOGGER.info("successfully created a compaction required file - " + statusFile);
+ return true;
+ } else {
+ LOGGER.error("Not able to create a compaction required file - " + statusFile);
+ return false;
+ }
+ } else {
+ LOGGER.info("Compaction request file : " + statusFile + " already exist.");
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception in creating the compaction request file " + e.getMessage() );
+ }
+ return false;
+ }
+
+ /**
+ * This will check if any compaction request has been received for any table.
+ *
+ * @param tableMetas
+ * @return
+ */
+ public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+ List<CarbonTableIdentifier> skipList) {
+ for (TableMeta table : tableMetas) {
+ CarbonTable ctable = table.carbonTable;
+ String metadataPath = ctable.getMetaDataFilepath();
+ // check for the compaction required file and at the same time exclude the tables which are
+ // present in the skip list.
+ if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
+ .contains(table.carbonTableIdentifier)) {
+ return table;
+ }
+ }
+ return null;
+ }
+}