You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:45 UTC
[07/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
deleted file mode 100644
index f2a1f9f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ /dev/null
@@ -1,976 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.CarbonUtilException;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
-import org.apache.carbondata.lcm.locks.ICarbonLock;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.csvload.DataGraphExecuter;
-import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
-import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
-import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
-import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.spark.merger.NodeBlockRelation;
-import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
-
-import com.google.gson.Gson;
-import org.apache.spark.SparkConf;
-import org.apache.spark.util.Utils;
-
-
-public final class CarbonLoaderUtil {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
- /**
- * minimum no of blocklet required for distribution
- */
- private static int minBlockLetsReqForDistribution = 0;
-
- static {
- String property = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE);
- try {
- minBlockLetsReqForDistribution = Integer.parseInt(property);
- } catch (NumberFormatException ne) {
- LOGGER.info("Invalid configuration. Consisering the defaul");
- minBlockLetsReqForDistribution =
- CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE;
- }
- }
-
- private CarbonLoaderUtil() {
-
- }
-
- private static void generateGraph(IDataProcessStatus dataProcessTaskStatus, SchemaInfo info,
- CarbonLoadModel loadModel, String outputLocation)
- throws GraphGeneratorException {
- DataLoadModel model = new DataLoadModel();
- model.setCsvLoad(null != dataProcessTaskStatus.getCsvFilePath()
- || null != dataProcessTaskStatus.getFilesToProcess());
- model.setSchemaInfo(info);
- model.setTableName(dataProcessTaskStatus.getTableName());
- List<LoadMetadataDetails> loadMetadataDetails = loadModel.getLoadMetadataDetails();
- if (null != loadMetadataDetails && !loadMetadataDetails.isEmpty()) {
- model.setLoadNames(
- CarbonDataProcessorUtil.getLoadNameFromLoadMetaDataDetails(loadMetadataDetails));
- model.setModificationOrDeletionTime(CarbonDataProcessorUtil
- .getModificationOrDeletionTimesFromLoadMetadataDetails(loadMetadataDetails));
- }
- model.setBlocksID(dataProcessTaskStatus.getBlocksID());
- model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter());
- model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter());
- model.setCommentCharacter(dataProcessTaskStatus.getCommentCharacter());
- model.setRddIteratorKey(dataProcessTaskStatus.getRddIteratorKey());
- model.setTaskNo(loadModel.getTaskNo());
- model.setFactTimeStamp(loadModel.getFactTimeStamp());
- model.setMaxColumns(loadModel.getMaxColumns());
- model.setDateFormat(loadModel.getDateFormat());
- boolean hdfsReadMode =
- dataProcessTaskStatus.getCsvFilePath() != null
- && dataProcessTaskStatus.getCsvFilePath().startsWith("hdfs:");
- int allocate =
- null != dataProcessTaskStatus.getCsvFilePath()
- ? 1 : dataProcessTaskStatus.getFilesToProcess().size();
- GraphGenerator generator = new GraphGenerator(model, hdfsReadMode, loadModel.getPartitionId(),
- loadModel.getStorePath(), allocate,
- loadModel.getCarbonDataLoadSchema(), loadModel.getSegmentId(), outputLocation);
- generator.generateGraph();
- }
-
- public static void executeGraph(CarbonLoadModel loadModel, String storeLocation,
- String storePath, String kettleHomePath) throws Exception {
- System.setProperty("KETTLE_HOME", kettleHomePath);
- if (!new File(storeLocation).mkdirs()) {
- LOGGER.error("Error while creating the temp store path: " + storeLocation);
- }
- String outPutLoc = storeLocation + "/etl";
- String databaseName = loadModel.getDatabaseName();
- String tableName = loadModel.getTableName();
- String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
- + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, storePath);
- // CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
- CarbonProperties.getInstance().addProperty("send.signal.load", "false");
-
- String fileNamePrefix = "";
- if (loadModel.isAggLoadRequest()) {
- fileNamePrefix = "graphgenerator";
- }
- String graphPath =
- outPutLoc + File.separator + databaseName + File.separator + tableName + File.separator
- + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo() + File.separator
- + tableName + fileNamePrefix + ".ktr";
- File path = new File(graphPath);
- if (path.exists()) {
- path.delete();
- }
-
- DataProcessTaskStatus dataProcessTaskStatus
- = new DataProcessTaskStatus(databaseName, tableName);
- dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
- dataProcessTaskStatus.setDimCSVDirLoc(loadModel.getDimFolderPath());
- if (loadModel.isDirectLoad()) {
- dataProcessTaskStatus.setFilesToProcess(loadModel.getFactFilesToProcess());
- dataProcessTaskStatus.setDirectLoad(true);
- dataProcessTaskStatus.setCsvDelimiter(loadModel.getCsvDelimiter());
- dataProcessTaskStatus.setCsvHeader(loadModel.getCsvHeader());
- }
-
- dataProcessTaskStatus.setBlocksID(loadModel.getBlocksID());
- dataProcessTaskStatus.setEscapeCharacter(loadModel.getEscapeChar());
- dataProcessTaskStatus.setQuoteCharacter(loadModel.getQuoteChar());
- dataProcessTaskStatus.setCommentCharacter(loadModel.getCommentChar());
- dataProcessTaskStatus.setRddIteratorKey(loadModel.getRddIteratorKey());
- dataProcessTaskStatus.setDateFormat(loadModel.getDateFormat());
- SchemaInfo info = new SchemaInfo();
-
- info.setDatabaseName(databaseName);
- info.setTableName(tableName);
- info.setAutoAggregateRequest(loadModel.isAggLoadRequest());
- info.setComplexDelimiterLevel1(loadModel.getComplexDelimiterLevel1());
- info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
- info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
- info.setBadRecordsLoggerEnable(loadModel.getBadRecordsLoggerEnable());
- info.setBadRecordsLoggerAction(loadModel.getBadRecordsAction());
-
- generateGraph(dataProcessTaskStatus, info, loadModel, outPutLoc);
-
- DataGraphExecuter graphExecuter = new DataGraphExecuter(dataProcessTaskStatus);
- graphExecuter
- .executeGraph(graphPath, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN),
- info, loadModel.getPartitionId(), loadModel.getCarbonDataLoadSchema());
- }
-
- public static List<String> addNewSliceNameToList(String newSlice, List<String> activeSlices) {
- activeSlices.add(newSlice);
- return activeSlices;
- }
-
- public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
-
- for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
- deleteStorePath(segmentPath);
- }
- }
-
- public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
- final boolean isCompactionFlow) throws IOException {
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
- String metaDataLocation = carbonTable.getMetaDataFilepath();
- final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
-
- //delete folder which metadata no exist in tablestatus
- for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
- final String partitionCount = i + "";
- String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
- FileType fileType = FileFactory.getFileType(partitionPath);
- if (FileFactory.isFileExist(partitionPath, fileType)) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
- CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile path) {
- String segmentId =
- CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
- boolean found = false;
- for (int j = 0; j < details.length; j++) {
- if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
- .equals(partitionCount)) {
- found = true;
- break;
- }
- }
- return !found;
- }
- });
- for (int k = 0; k < listFiles.length; k++) {
- String segmentId =
- CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
- if (isCompactionFlow) {
- if (segmentId.contains(".")) {
- deleteStorePath(listFiles[k].getAbsolutePath());
- }
- } else {
- if (!segmentId.contains(".")) {
- deleteStorePath(listFiles[k].getAbsolutePath());
- }
- }
- }
- }
- }
- }
-
- public static void deleteStorePath(String path) {
- try {
- FileType fileType = FileFactory.getFileType(path);
- if (FileFactory.isFileExist(path, fileType)) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
- CarbonUtil.deleteFoldersAndFiles(carbonFile);
- }
- } catch (IOException e) {
- LOGGER.error("Unable to delete the given path :: " + e.getMessage());
- } catch (CarbonUtilException e) {
- LOGGER.error("Unable to delete the given path :: " + e.getMessage());
- }
- }
-
- public static List<String> getListOfValidSlices(LoadMetadataDetails[] details) {
- List<String> activeSlices =
- new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (LoadMetadataDetails oneLoad : details) {
- if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS.equals(oneLoad.getLoadStatus())
- || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(oneLoad.getLoadStatus())
- || CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
- if (null != oneLoad.getMergedLoadName()) {
- String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getMergedLoadName();
- activeSlices.add(loadName);
- } else {
- String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
- activeSlices.add(loadName);
- }
- }
- }
- return activeSlices;
- }
-
- public static List<String> getListOfUpdatedSlices(LoadMetadataDetails[] details) {
- List<String> updatedSlices =
- new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (LoadMetadataDetails oneLoad : details) {
- if (CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
- if (null != oneLoad.getMergedLoadName()) {
- updatedSlices.add(oneLoad.getMergedLoadName());
- } else {
- updatedSlices.add(oneLoad.getLoadName());
- }
- }
- }
- return updatedSlices;
- }
-
- public static void removeSliceFromMemory(String databaseName, String tableName, String loadName) {
- // TODO: Remove from memory
- }
-
- /**
- * This method will delete the local data load folder location after data load is complete
- *
- * @param loadModel
- */
- public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
- boolean isCompactionFlow) {
- String databaseName = loadModel.getDatabaseName();
- String tableName = loadModel.getTableName();
- String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
- + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
- if (isCompactionFlow) {
- tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
- }
- // form local store location
- String localStoreLocation = CarbonProperties.getInstance()
- .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
- try {
- CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile() });
- LOGGER.info("Deleted the local store location" + localStoreLocation);
- } catch (CarbonUtilException e) {
- LOGGER.error(e, "Failed to delete local data load folder location");
- }
-
- }
-
- /**
- * This method will get the store location for the given path, segemnt id and partition id
- *
- * @param storePath
- * @param carbonTableIdentifier
- * @param segmentId
- * @param partitionId
- * @return
- */
- public static String getStoreLocation(String storePath,
- CarbonTableIdentifier carbonTableIdentifier, String segmentId, String partitionId) {
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
- String carbonDataFilePath = carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
- return carbonDataFilePath;
- }
-
- /**
- * This API will write the load level metadata for the loadmanagement module inorder to
- * manage the load and query execution management smoothly.
- *
- * @param loadCount
- * @param loadMetadataDetails
- * @param loadModel
- * @param loadStatus
- * @param startLoadTime
- * @return boolean which determines whether status update is done or not.
- * @throws IOException
- */
- public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
- CarbonLoadModel loadModel, String loadStatus, String startLoadTime) throws IOException {
-
- boolean status = false;
-
- String metaDataFilepath =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
-
- AbsoluteTableIdentifier absoluteTableIdentifier =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
-
- String tableStatusPath = carbonTablePath.getTableStatusFilePath();
- ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
-
- try {
- if (carbonLock.lockWithRetries()) {
- LOGGER.info(
- "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
- + " for table status updation");
-
- LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
- String loadEnddate = readCurrentTime();
- loadMetadataDetails.setTimestamp(loadEnddate);
- loadMetadataDetails.setLoadStatus(loadStatus);
- loadMetadataDetails.setLoadName(String.valueOf(loadCount));
- loadMetadataDetails.setLoadStartTime(startLoadTime);
-
- List<LoadMetadataDetails> listOfLoadFolderDetails =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- if (null != listOfLoadFolderDetailsArray) {
- for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
- listOfLoadFolderDetails.add(loadMetadata);
- }
- }
- listOfLoadFolderDetails.add(loadMetadataDetails);
-
- SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
- .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
-
- status = true;
- } else {
- LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
- .getDatabaseName() + "." + loadModel.getTableName());
- }
- } finally {
- if (carbonLock.unlock()) {
- LOGGER.info(
- "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
- + "." + loadModel.getTableName());
- } else {
- LOGGER.error(
- "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
- .getTableName() + " during table status updation");
- }
- }
- return status;
- }
-
- public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
- String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(schema.getCarbonTable().getStorePath(),
- schema.getCarbonTable().getCarbonTableIdentifier());
- String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
-
- DataOutputStream dataOutputStream;
- Gson gsonObjectToWrite = new Gson();
- BufferedWriter brWriter = null;
-
- AtomicFileOperations writeOperation =
- new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
- try {
-
- dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
- brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
- brWriter.write(metadataInstance);
- } finally {
- try {
- if (null != brWriter) {
- brWriter.flush();
- }
- } catch (Exception e) {
- LOGGER.error("error in flushing ");
-
- }
- CarbonUtil.closeStreams(brWriter);
- writeOperation.close();
- }
-
- }
-
- public static String readCurrentTime() {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
- }
-
- public static String extractLoadMetadataFileLocation(CarbonLoadModel loadModel) {
- CarbonTable carbonTable =
- org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
- return carbonTable.getMetaDataFilepath();
- }
-
- public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
- String carbonStorePath) throws CarbonUtilException {
- Cache dictCache =
- CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
- return (Dictionary) dictCache.get(columnIdentifier);
- }
-
- public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
- ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
- throws CarbonUtilException {
- return getDictionary(
- new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType),
- carbonStorePath);
- }
-
- /**
- * This method will divide the blocks among the tasks of the nodes as per the data locality
- *
- * @param blockInfos
- * @param noOfNodesInput -1 if number of nodes has to be decided
- * based on block location information
- * @param parallelism total no of tasks to execute in parallel
- * @return
- */
- public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
- List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
- List<String> activeNode) {
-
- Map<String, List<Distributable>> mapOfNodes =
- CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
- int taskPerNode = parallelism / mapOfNodes.size();
- //assigning non zero value to noOfTasksPerNode
- int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
- // divide the blocks of a node among the tasks of the node.
- return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
- }
-
- /**
- * This method will divide the blocks among the nodes as per the data locality
- *
- * @param blockInfos
- * @return
- */
- public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
- int noOfNodesInput) {
- return nodeBlockMapping(blockInfos, noOfNodesInput, null);
- }
-
- /**
- * This method will divide the blocks among the nodes as per the data locality
- *
- * @param blockInfos
- * @return
- */
- public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
- // -1 if number of nodes has to be decided based on block location information
- return nodeBlockMapping(blockInfos, -1);
- }
-
- /**
- * the method returns the number of required executors
- *
- * @param blockInfos
- * @return
- */
- public static Map<String, List<Distributable>> getRequiredExecutors(
- List<Distributable> blockInfos) {
- List<NodeBlockRelation> flattenedList =
- new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (Distributable blockInfo : blockInfos) {
- try {
- for (String eachNode : blockInfo.getLocations()) {
- NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
- flattenedList.add(nbr);
- }
- } catch (IOException e) {
- throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
- }
- }
- // sort the flattened data.
- Collections.sort(flattenedList);
- Map<String, List<Distributable>> nodeAndBlockMapping =
- new LinkedHashMap<String, List<Distributable>>(
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- // from the flattened list create a mapping of node vs Data blocks.
- createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
- return nodeAndBlockMapping;
- }
-
- /**
- * This method will divide the blocks among the nodes as per the data locality
- *
- * @param blockInfos
- * @param noOfNodesInput -1 if number of nodes has to be decided
- * based on block location information
- * @return
- */
- public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
- int noOfNodesInput, List<String> activeNodes) {
-
- Map<String, List<Distributable>> nodeBlocksMap =
- new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- List<NodeBlockRelation> flattenedList =
- new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- Set<Distributable> uniqueBlocks =
- new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
-
- int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
- if (null != activeNodes) {
- noofNodes = activeNodes.size();
- }
- int blocksPerNode = blockInfos.size() / noofNodes;
- blocksPerNode = blocksPerNode <=0 ? 1 : blocksPerNode;
-
- // sort the flattened data.
- Collections.sort(flattenedList);
-
- Map<String, List<Distributable>> nodeAndBlockMapping =
- new LinkedHashMap<String, List<Distributable>>(
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- // from the flattened list create a mapping of node vs Data blocks.
- createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-
- // so now we have a map of node vs blocks. allocate the block as per the order
- createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
-
- // if any blocks remain then assign them to nodes in round robin.
- assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
-
- return nodeBlocksMap;
- }
-
- /**
- * Assigning the blocks of a node to tasks.
- *
- * @param nodeBlocksMap nodeName to list of blocks mapping
- * @param noOfTasksPerNode
- * @return
- */
- private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
- Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
- Map<String, List<List<Distributable>>> outputMap =
- new HashMap<String, List<List<Distributable>>>(
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- // for each node
- for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
-
- List<Distributable> blockOfEachNode = eachNode.getValue();
- //sorting the block so same block will be give to same executor
- Collections.sort(blockOfEachNode);
- // create the task list for each node.
- createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
-
- // take all the block of node and divide it among the tasks of a node.
- divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
- }
-
- return outputMap;
- }
-
- /**
- * This will divide the blocks of a node to tasks of the node.
- *
- * @param outputMap
- * @param key
- * @param blockOfEachNode
- */
- private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
- String key, List<Distributable> blockOfEachNode) {
-
- List<List<Distributable>> taskLists = outputMap.get(key);
- int tasksOfNode = taskLists.size();
- int i = 0;
- for (Distributable block : blockOfEachNode) {
-
- taskLists.get(i % tasksOfNode).add(block);
- i++;
- }
-
- }
-
- /**
- * This will create the empty list for each task of a node.
- *
- * @param outputMap
- * @param noOfTasksPerNode
- * @param key
- */
- private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
- int noOfTasksPerNode, String key) {
- List<List<Distributable>> nodeTaskList =
- new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (int i = 0; i < noOfTasksPerNode; i++) {
- List<Distributable> eachTask =
- new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- nodeTaskList.add(eachTask);
-
- }
- outputMap.put(key, nodeTaskList);
-
- }
-
- /**
- * If any left over data blocks are present then assign those to nodes in round robin way.
- *
- * @param outputMap
- * @param uniqueBlocks
- */
- private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
- Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
-
- if (activeNodes != null) {
- for (String activeNode : activeNodes) {
- List<Distributable> blockLst = outputMap.get(activeNode);
- if (null == blockLst) {
- blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- }
- populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
- if (blockLst.size() > 0) {
- outputMap.put(activeNode, blockLst);
- }
- }
- } else {
- for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
- List<Distributable> blockLst = entry.getValue();
- populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
- }
-
- }
-
- for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
- Iterator<Distributable> blocks = uniqueBlocks.iterator();
- if (blocks.hasNext()) {
- Distributable block = blocks.next();
- List<Distributable> blockLst = entry.getValue();
- blockLst.add(block);
- blocks.remove();
- }
- }
- }
-
- /**
- * The method populate the blockLst to be allocate to a specific node.
- * @param uniqueBlocks
- * @param noOfBlocksPerNode
- * @param blockLst
- */
- private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
- List<Distributable> blockLst) {
- Iterator<Distributable> blocks = uniqueBlocks.iterator();
- //if the node is already having the per block nodes then avoid assign the extra blocks
- if (blockLst.size() == noOfBlocksPerNode) {
- return;
- }
- while (blocks.hasNext()) {
- Distributable block = blocks.next();
- blockLst.add(block);
- blocks.remove();
- if (blockLst.size() >= noOfBlocksPerNode) {
- break;
- }
- }
- }
-
- /**
- * To create the final output of the Node and Data blocks
- *
- * @param outputMap
- * @param blocksPerNode
- * @param uniqueBlocks
- * @param nodeAndBlockMapping
- * @param activeNodes
- */
- private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
- Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
- List<String> activeNodes) {
-
- ArrayList<NodeMultiBlockRelation> multiBlockRelations =
- new ArrayList<>(nodeAndBlockMapping.size());
- for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
- multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
- }
- // sort nodes based on number of blocks per node, so that nodes having lesser blocks
- // are assigned first
- Collections.sort(multiBlockRelations);
-
- for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
- String nodeName = nodeMultiBlockRelation.getNode();
- //assign the block to the node only if the node is active
- String activeExecutor = nodeName;
- if (null != activeNodes) {
- activeExecutor = getActiveExecutor(activeNodes, nodeName);
- if (null == activeExecutor) {
- continue;
- }
- }
- // this loop will be for each NODE
- int nodeCapacity = 0;
- // loop thru blocks of each Node
- for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
-
- // check if this is already assigned.
- if (uniqueBlocks.contains(block)) {
-
- if (null == outputMap.get(activeExecutor)) {
- List<Distributable> list =
- new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- outputMap.put(activeExecutor, list);
- }
- // assign this block to this node if node has capacity left
- if (nodeCapacity < blocksPerNode) {
- List<Distributable> infos = outputMap.get(activeExecutor);
- infos.add(block);
- nodeCapacity++;
- uniqueBlocks.remove(block);
- } else {
- // No need to continue loop as node is full
- break;
- }
- }
- }
- }
- }
-
- /**
- * method validates whether the node is active or not.
- *
- * @param activeNode
- * @param nodeName
- * @return returns true if active else false.
- */
- private static String getActiveExecutor(List activeNode, String nodeName) {
- boolean isActiveNode = activeNode.contains(nodeName);
- if (isActiveNode) {
- return nodeName;
- }
- //if localhost then retrieve the localhost name then do the check
- else if (nodeName.equals("localhost")) {
- try {
- String hostName = InetAddress.getLocalHost().getHostName();
- isActiveNode = activeNode.contains(hostName);
- if(isActiveNode){
- return hostName;
- }
- } catch (UnknownHostException ue) {
- isActiveNode = false;
- }
- } else {
- try {
- String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
- isActiveNode = activeNode.contains(hostAddress);
- if(isActiveNode){
- return hostAddress;
- }
- } catch (UnknownHostException ue) {
- isActiveNode = false;
- }
- }
- return null;
- }
-
- /**
- * Create the Node and its related blocks Mapping and put in a Map
- *
- * @param flattenedList
- * @param nodeAndBlockMapping
- */
- private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
- Map<String, List<Distributable>> nodeAndBlockMapping) {
- for (NodeBlockRelation nbr : flattenedList) {
- String node = nbr.getNode();
- List<Distributable> list;
-
- if (null == nodeAndBlockMapping.get(node)) {
- list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- list.add(nbr.getBlock());
- Collections.sort(list);
- nodeAndBlockMapping.put(node, list);
- } else {
- list = nodeAndBlockMapping.get(node);
- list.add(nbr.getBlock());
- Collections.sort(list);
- }
- }
- }
-
- /**
- * Create the flat List i.e flattening of the Map.
- *
- * @param blockInfos
- * @param flattenedList
- * @param uniqueBlocks
- */
- private static void createFlattenedListFromMap(List<Distributable> blockInfos,
- List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
- Set<String> nodeList) {
- for (Distributable blockInfo : blockInfos) {
- // put the blocks in the set
- uniqueBlocks.add(blockInfo);
-
- try {
- for (String eachNode : blockInfo.getLocations()) {
- NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
- flattenedList.add(nbr);
- nodeList.add(eachNode);
- }
- } catch (IOException e) {
- throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
- }
- }
- }
-
- /**
- * This method will get the store location for the given path, segment id and partition id
- *
- * @param carbonStorePath
- * @param dbName
- * @param tableName
- * @param partitionCount
- * @param segmentId
- */
- public static void checkAndCreateCarbonDataLocation(String carbonStorePath, String dbName,
- String tableName, int partitionCount, String segmentId) {
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(dbName + CarbonCommonConstants.UNDERSCORE + tableName);
- CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
- for (int i = 0; i < partitionCount; i++) {
- String carbonDataDirectoryPath =
- carbonTablePath.getCarbonDataDirectoryPath(String.valueOf(i), segmentId);
- CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
- }
- }
-
- /**
- * return the Array of available local-dirs
- *
- * @param conf
- * @return
- */
- public static String[] getConfiguredLocalDirs(SparkConf conf) {
- return Utils.getConfiguredLocalDirs(conf);
- }
-
- /**
- * This will update the old table status details before clean files to the latest table status.
- * @param oldList
- * @param newList
- * @return
- */
- public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
- LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
-
- List<LoadMetadataDetails> newListMetadata =
- new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
- for (LoadMetadataDetails oldSegment : oldList) {
- if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
- newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
- }
- }
- return newListMetadata;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
deleted file mode 100644
index 2b3979f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name : Carbon
- * Module Name : CARBON spark interface
- * Author : R00903928
- * Created Date : 22-Sep-2015
- * FileName : DeleteLoadFolders.java
- * Description : for physical deletion of load folders.
- * Class Version : 1.0
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-
-public final class DeleteLoadFolders {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
-
- private DeleteLoadFolders() {
-
- }
-
- /**
- * returns segment path
- *
- * @param loadModel
- * @param storeLocation
- * @param partitionId
- * @param oneLoad
- * @return
- */
- private static String getSegmentPath(CarbonLoadModel loadModel, String storeLocation,
- int partitionId, LoadMetadataDetails oneLoad) {
-
- String path = null;
- String segmentId = oneLoad.getLoadName();
-
- path = new CarbonStorePath(storeLocation).getCarbonTablePath(
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())
- .getCarbonDataDirectoryPath("" + partitionId, segmentId);
- return path;
- }
-
- private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
-
- boolean status = false;
- try {
- if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
- CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
- CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- return (CarbonTablePath.isCarbonDataFile(file.getName())
- || CarbonTablePath.isCarbonIndexFile(file.getName()));
- }
- });
-
- //if there are no fact and msr metadata files present then no need to keep
- //entry in metadata.
- if (filesToBeDeleted.length == 0) {
- status = true;
- } else {
-
- for (CarbonFile eachFile : filesToBeDeleted) {
- if (!eachFile.delete()) {
- LOGGER.warn("Unable to delete the file as per delete command "
- + eachFile.getAbsolutePath());
- status = false;
- } else {
- status = true;
- }
- }
- }
- // need to delete the complete folder.
- if(status){
- if(!file.delete()){
- LOGGER.warn("Unable to delete the folder as per delete command "
- + file.getAbsolutePath());
- status = false;
- }
- }
-
- } else {
- status = false;
- }
- } catch (IOException e) {
- LOGGER.warn("Unable to delete the file as per delete command " + path);
- }
-
- return status;
-
- }
-
- private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
- boolean isForceDelete) {
- if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
- || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
- && oneLoad.getVisibility().equalsIgnoreCase("true")) {
- if (isForceDelete) {
- return true;
- }
- String deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
- SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- Date deletionDate = null;
- String date = null;
- Date currentTimeStamp = null;
- try {
- deletionDate = parser.parse(deletionTime);
- date = CarbonLoaderUtil.readCurrentTime();
- currentTimeStamp = parser.parse(date);
- } catch (ParseException e) {
- return false;
- }
-
- long difference = currentTimeStamp.getTime() - deletionDate.getTime();
-
- long minutesElapsed = (difference / (1000 * 60));
-
- int maxTime;
- try {
- maxTime = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
- } catch (NumberFormatException e) {
- maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
- }
- if (minutesElapsed > maxTime) {
- return true;
- }
-
- }
-
- return false;
- }
-
- private static void factFileRenaming(String loadFolderPath) {
-
- FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
- try {
- if (FileFactory.isFileExist(loadFolderPath, fileType)) {
- CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
-
- CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- return (file.getName().endsWith('_' + CarbonCommonConstants.FACT_FILE_UPDATED));
- }
- });
-
- for (CarbonFile file : listFiles) {
- if (!file.renameTo(file.getName().substring(0,
- file.getName().length() - CarbonCommonConstants.FACT_FILE_UPDATED.length()))) {
- LOGGER.warn("could not rename the updated fact file.");
- }
- }
-
- }
- } catch (IOException e) {
- LOGGER.error("exception" + e.getMessage());
- }
-
- }
-
- private static void cleanDeletedFactFile(String loadFolderPath) {
- FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
- try {
- if (FileFactory.isFileExist(loadFolderPath, fileType)) {
- CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
-
- CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- return (file.getName().endsWith(CarbonCommonConstants.FACT_DELETE_EXTENSION));
- }
- });
-
- for (CarbonFile file : listFiles) {
- if (!file.delete()) {
- LOGGER.warn("could not delete the marked fact file.");
- }
- }
-
- }
- } catch (IOException e) {
- LOGGER.error("exception" + e.getMessage());
- }
- }
-
- /**
- * @param loadModel
- * @param storeLocation
- * @param isForceDelete
- * @param details
- * @return
- *
- */
- public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
- String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
- List<LoadMetadataDetails> deletedLoads =
- new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- boolean isDeleted = false;
-
- if (details != null && details.length != 0) {
- for (LoadMetadataDetails oneLoad : details) {
- if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
- String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
- boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
- if (deletionStatus) {
- isDeleted = true;
- oneLoad.setVisibility("false");
- deletedLoads.add(oneLoad);
- LOGGER.info("Info: " +
- " Deleted the load " + oneLoad.getLoadName());
- }
- }
- }
- }
-
- return isDeleted;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
deleted file mode 100644
index 0926e1c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name : Carbon
- * Module Name : CARBON Data Processor
- * Author : R00903928
- * Created Date : 21-Sep-2015
- * FileName : DeleteLoadFromMetadata.java
- * Description : Kettle step to generate MD Key
- * Class Version : 1.0
- */
-package org.apache.carbondata.spark.load;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DeleteLoadFromMetadata {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DeleteLoadFromMetadata.class.getName());
-
- private DeleteLoadFromMetadata() {
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
deleted file mode 100644
index 661e17c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.load;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class DeletedLoadMetadata implements Serializable {
-
- private static final long serialVersionUID = 7083059404172117208L;
- private Map<String, String> deletedLoadMetadataMap =
- new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- public void addDeletedLoadMetadata(String loadId, String status) {
- deletedLoadMetadataMap.put(loadId, status);
- }
-
- public List<String> getDeletedLoadMetadataIds() {
- return new ArrayList<String>(deletedLoadMetadataMap.keySet());
- }
-
- public String getDeletedLoadMetadataStatus(String loadId) {
- if (deletedLoadMetadataMap.containsKey(loadId)) {
- return deletedLoadMetadataMap.get(loadId);
- } else {
- return null;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
deleted file mode 100644
index 84e6c00..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ /dev/null
@@ -1,696 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.merger;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.integration.spark.merger.CompactionType;
-import org.apache.carbondata.lcm.locks.ICarbonLock;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.spark.load.CarbonLoaderUtil;
-
-/**
- * utility class for load merging.
- */
-public final class CarbonDataMergerUtil {
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName());
-
- private CarbonDataMergerUtil() {
-
- }
-
- /**
- * Returns the size of all the carbondata files present in the segment.
- * @param carbonFile
- * @return
- */
- private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
- long factSize = 0;
-
- // carbon data file case.
- CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- return CarbonTablePath.isCarbonDataFile(file.getName());
- }
- });
-
- for (CarbonFile fact : factFile) {
- factSize += fact.getSize();
- }
-
- return factSize;
- }
-
- /**
- * To check whether the merge property is enabled or not.
- *
- * @return
- */
-
- public static boolean checkIfAutoLoadMergingRequired() {
- // load merge is not supported as per new store format
- // moving the load merge check in early to avoid unnecessary load listing causing IOException
- // check whether carbons segment merging operation is enabled or not.
- // default will be false.
- String isLoadMergeEnabled = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
- CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
- if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
- return false;
- }
- return true;
- }
-
- /**
- * Form the Name of the New Merge Folder
- *
- * @param segmentsToBeMergedList
- * @return
- */
- public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) {
- String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName();
- if (firstSegmentName.contains(".")) {
- String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
- String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
- int fraction = Integer.parseInt(afterDecimal) + 1;
- String mergedSegmentName = beforeDecimal + "." + fraction;
- return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName;
- } else {
- String mergeName = firstSegmentName + "." + 1;
- return CarbonCommonConstants.LOAD_FOLDER + mergeName;
- }
-
- }
-
- public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
- String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
- String mergeLoadStartTime, CompactionType compactionType) {
-
- boolean tableStatusUpdationStatus = false;
- AbsoluteTableIdentifier absoluteTableIdentifier =
- carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
-
- try {
- if (carbonLock.lockWithRetries()) {
- LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
- + carbonLoadModel.getTableName() + " for table status updation ");
-
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
-
- String statusFilePath = carbonTablePath.getTableStatusFilePath();
-
- LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
- String mergedLoadNumber = MergedLoadName.substring(
- MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
- + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
-
- String modificationOrDeletionTimeStamp = CarbonLoaderUtil.readCurrentTime();
- for (LoadMetadataDetails loadDetail : loadDetails) {
- // check if this segment is merged.
- if (loadsToMerge.contains(loadDetail)) {
- // if the compacted load is deleted after the start of the compaction process,
- // then need to discard the compaction process and treat it as failed compaction.
- if (loadDetail.getLoadStatus()
- .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
- LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
- + " is deleted after the compaction is started.");
- return tableStatusUpdationStatus;
- }
- loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
- loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
- loadDetail.setMergedLoadName(mergedLoadNumber);
- }
- }
-
- // create entry for merged one.
- LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
- loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
- String loadEnddate = CarbonLoaderUtil.readCurrentTime();
- loadMetadataDetails.setTimestamp(loadEnddate);
- loadMetadataDetails.setLoadName(mergedLoadNumber);
- loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
- loadMetadataDetails.setPartitionCount("0");
- // if this is a major compaction then set the segment as major compaction.
- if (compactionType == CompactionType.MAJOR_COMPACTION) {
- loadMetadataDetails.setMajorCompacted("true");
- }
-
- List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
-
- // put the merged folder entry
- updatedDetailsList.add(loadMetadataDetails);
-
- try {
- SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
- updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
- tableStatusUpdationStatus = true;
- } catch (IOException e) {
- LOGGER.error("Error while writing metadata");
- }
- } else {
- LOGGER.error(
- "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
- + carbonLoadModel.getTableName() + "for table status updation");
- }
- } finally {
- if (carbonLock.unlock()) {
- LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
- .getDatabaseName() + "." + carbonLoadModel.getTableName());
- } else {
- LOGGER.error(
- "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
- + carbonLoadModel.getTableName() + " during table status updation");
- }
- }
- return tableStatusUpdationStatus;
- }
-
- /**
- * To identify which all segments can be merged.
- *
- * @param storeLocation
- * @param carbonLoadModel
- * @param compactionSize
- * @return
- */
- public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
- CarbonLoadModel carbonLoadModel, long compactionSize,
- List<LoadMetadataDetails> segments, CompactionType compactionType) {
-
- List sortedSegments = new ArrayList(segments);
-
- sortSegments(sortedSegments);
-
- // check preserve property and preserve the configured number of latest loads.
-
- List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
- checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
-
- // filter the segments if the compaction based on days is configured.
-
- List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
- identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
- List<LoadMetadataDetails> listOfSegmentsToBeMerged;
- // identify the segments to merge based on the Size of the segments across partition.
- if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
-
- listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
- listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
- } else {
-
- listOfSegmentsToBeMerged =
- identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
- }
-
- return listOfSegmentsToBeMerged;
- }
-
- /**
- * Sorting of the segments.
- * @param segments
- */
- public static void sortSegments(List segments) {
- // sort the segment details.
- Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
- @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
- double seg1Id = Double.parseDouble(seg1.getLoadName());
- double seg2Id = Double.parseDouble(seg2.getLoadName());
- if (seg1Id - seg2Id < 0) {
- return -1;
- }
- if (seg1Id - seg2Id > 0) {
- return 1;
- }
- return 0;
- }
- });
- }
-
- /**
- * This method will return the list of loads which are loaded at the same interval.
- * This property is configurable.
- *
- * @param listOfSegmentsBelowThresholdSize
- * @return
- */
- private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate(
- List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) {
-
- List<LoadMetadataDetails> loadsOfSameDate =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- long numberOfDaysAllowedToMerge = 0;
- try {
- numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
- CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
- if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) {
- LOGGER.error(
- "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT
- + " is incorrect."
- + " Correct value should be in range of 0 -100. Taking the default value.");
- numberOfDaysAllowedToMerge =
- Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
- }
-
- } catch (NumberFormatException e) {
- numberOfDaysAllowedToMerge =
- Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
- }
- // if true then process loads according to the load date.
- if (numberOfDaysAllowedToMerge > 0) {
-
- // filter loads based on the loaded date
- boolean first = true;
- Date segDate1 = null;
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
-
- if (first) {
- segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
- first = false;
- continue;
- }
- String segmentDate = segment.getLoadStartTime();
- Date segDate2 = null;
- try {
- segDate2 = sdf.parse(segmentDate);
- } catch (ParseException e) {
- LOGGER.error("Error while parsing segment start time" + e.getMessage());
- }
-
- if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) {
- loadsOfSameDate.add(segment);
- }
- // if the load is beyond merged date.
- // then reset everything and continue search for loads.
- else if (loadsOfSameDate.size() < 2) {
- loadsOfSameDate.clear();
- // need to add the next segment as first and to check further
- segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
- } else { // case where a load is beyond merge date and there is at least 2 loads to merge.
- break;
- }
- }
- } else {
- return listOfSegmentsBelowThresholdSize;
- }
-
- return loadsOfSameDate;
- }
-
- /**
- * @param loadsOfSameDate
- * @param segment
- * @return
- */
- private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate,
- LoadMetadataDetails segment, SimpleDateFormat sdf) {
- String baselineLoadStartTime = segment.getLoadStartTime();
- Date segDate1 = null;
- try {
- segDate1 = sdf.parse(baselineLoadStartTime);
- } catch (ParseException e) {
- LOGGER.error("Error while parsing segment start time" + e.getMessage());
- }
- loadsOfSameDate.add(segment);
- return segDate1;
- }
-
- /**
- * Method to check if the load dates are complied to the configured dates.
- *
- * @param segDate1
- * @param segDate2
- * @return
- */
- private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2,
- long numberOfDaysAllowedToMerge) {
- if(segDate1 == null || segDate2 == null) {
- return false;
- }
- // take 1 st date add the configured days .
- Calendar cal1 = Calendar.getInstance();
- cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate());
- Calendar cal2 = Calendar.getInstance();
- cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate());
-
- long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis();
-
- if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) {
- return true;
- }
- return false;
- }
-
- /**
- * Identify the segments to be merged based on the Size in case of Major compaction.
- *
- * @param compactionSize
- * @param listOfSegmentsAfterPreserve
- * @param carbonLoadModel
- * @param storeLocation
- * @return
- */
- private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
- long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
- CarbonLoadModel carbonLoadModel, String storeLocation) {
-
- List<LoadMetadataDetails> segmentsToBeMerged =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- CarbonTableIdentifier tableIdentifier =
- carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-
-
- // total length
- long totalLength = 0;
-
- // check size of each segment , sum it up across partitions
- for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
- String segId = segment.getLoadName();
- // variable to store one segment size across partition.
- long sizeOfOneSegmentAcrossPartition =
- getSizeOfSegment(storeLocation, tableIdentifier, segId);
-
- // if size of a segment is greater than the Major compaction size. then ignore it.
- if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
- // if already 2 segments have been found for merging then stop scan here and merge.
- if (segmentsToBeMerged.size() > 1) {
- break;
- } else { // if only one segment is found then remove the earlier one in list.
- // reset the total length to 0.
- segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- totalLength = 0;
- continue;
- }
- }
-
- totalLength += sizeOfOneSegmentAcrossPartition;
-
- // in case of major compaction the size doesnt matter. all the segments will be merged.
- if (totalLength < (compactionSize * 1024 * 1024)) {
- segmentsToBeMerged.add(segment);
- } else { // if already 2 segments have been found for merging then stop scan here and merge.
- if (segmentsToBeMerged.size() > 1) {
- break;
- } else { // if only one segment is found then remove the earlier one in list and put this.
- // reset the total length to the current identified segment.
- segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- segmentsToBeMerged.add(segment);
- totalLength = sizeOfOneSegmentAcrossPartition;
- }
- }
-
- }
-
- return segmentsToBeMerged;
- }
-
- /**
- * For calculating the size of the specified segment
- * @param storeLocation
- * @param tableIdentifier
- * @param segId
- * @return
- */
- private static long getSizeOfSegment(String storeLocation,
- CarbonTableIdentifier tableIdentifier, String segId) {
- String loadPath = CarbonLoaderUtil
- .getStoreLocation(storeLocation, tableIdentifier, segId, "0");
- CarbonFile segmentFolder =
- FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
- return getSizeOfFactFileInLoad(segmentFolder);
- }
-
- /**
- * Identify the segments to be merged based on the segment count
- *
- * @param listOfSegmentsAfterPreserve
- * @return
- */
- private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
- List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
-
- List<LoadMetadataDetails> mergedSegments =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<LoadMetadataDetails> unMergedSegments =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- int[] noOfSegmentLevelsCount =
- CarbonProperties.getInstance().getCompactionSegmentLevelCount();
-
- int level1Size = 0;
- int level2Size = 0;
- int size = noOfSegmentLevelsCount.length;
-
- if (size >= 2) {
- level1Size = noOfSegmentLevelsCount[0];
- level2Size = noOfSegmentLevelsCount[1];
- } else if (size == 1) {
- level1Size = noOfSegmentLevelsCount[0];
- }
-
- int unMergeCounter = 0;
- int mergeCounter = 0;
-
- // check size of each segment , sum it up across partitions
- for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
- String segName = segment.getLoadName();
-
- // if a segment is already merged 2 levels then it s name will become .2
- // need to exclude those segments from minor compaction.
- // if a segment is major compacted then should not be considered for minor.
- if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
- segment.isMajorCompacted() != null && segment.isMajorCompacted()
- .equalsIgnoreCase("true"))) {
- continue;
- }
-
- // check if the segment is merged or not
-
- if (!isMergedSegment(segName)) {
- //if it is an unmerged segment then increment counter
- unMergeCounter++;
- unMergedSegments.add(segment);
- if (unMergeCounter == (level1Size)) {
- return unMergedSegments;
- }
- } else {
- mergeCounter++;
- mergedSegments.add(segment);
- if (mergeCounter == (level2Size)) {
- return mergedSegments;
- }
- }
- }
- return new ArrayList<>(0);
- }
-
- /**
- * To check if the segment is merged or not.
- * @param segName
- * @return
- */
- private static boolean isMergedSegment(String segName) {
- if(segName.contains(".")){
- return true;
- }
- return false;
- }
-
- /**
- * checks number of loads to be preserved and returns remaining valid segments
- *
- * @param segments
- * @return
- */
- private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining(
- List<LoadMetadataDetails> segments) {
-
- int numberOfSegmentsToBePreserved = 0;
- // check whether the preserving of the segments from merging is enabled or not.
- // get the number of loads to be preserved.
- numberOfSegmentsToBePreserved =
- CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
- // get the number of valid segments and retain the latest loads from merging.
- return CarbonDataMergerUtil
- .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved);
- }
-
- /**
- * Retain the number of segments which are to be preserved and return the remaining list of
- * segments.
- *
- * @param loadMetadataDetails
- * @param numberOfSegToBeRetained
- * @return
- */
- private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining(
- List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) {
-
- List<LoadMetadataDetails> validList =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (LoadMetadataDetails segment : loadMetadataDetails) {
- if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- || segment.getLoadStatus()
- .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || segment
- .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE)) {
- validList.add(segment);
- }
- }
-
- // check if valid list is big enough for removing the number of seg to be retained.
- // last element
- int removingIndex = validList.size() - 1;
-
- for (int i = validList.size(); i > 0; i--) {
- if (numberOfSegToBeRetained == 0) {
- break;
- }
- // remove last segment
- validList.remove(removingIndex--);
- numberOfSegToBeRetained--;
- }
- return validList;
-
- }
-
- /**
- * This will give the compaction sizes configured based on compaction type.
- *
- * @param compactionType
- * @return
- */
- public static long getCompactionSize(CompactionType compactionType) {
-
- long compactionSize = 0;
- switch (compactionType) {
- case MAJOR_COMPACTION:
- compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
- break;
- default: // this case can not come.
- }
- return compactionSize;
- }
-
- /**
- * For getting the comma separated valid segments for merging.
- *
- * @param loadMetadataDetails
- * @return
- */
- public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
- StringBuilder builder = new StringBuilder();
- for (LoadMetadataDetails segment : loadMetadataDetails) {
- //check if this load is an already merged load.
- if (null != segment.getMergedLoadName()) {
- builder.append(segment.getMergedLoadName() + ",");
- } else {
- builder.append(segment.getLoadName() + ",");
- }
- }
- builder.deleteCharAt(builder.length() - 1);
- return builder.toString();
- }
-
- /**
- * Combining the list of maps to one map.
- *
- * @param mapsOfNodeBlockMapping
- * @return
- */
- public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps(
- List<Map<String, List<TableBlockInfo>>> mapsOfNodeBlockMapping) {
-
- Map<String, List<TableBlockInfo>> combinedMap =
- new HashMap<String, List<TableBlockInfo>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- // traverse list of maps.
- for (Map<String, List<TableBlockInfo>> eachMap : mapsOfNodeBlockMapping) {
- // traverse inside each map.
- for (Map.Entry<String, List<TableBlockInfo>> eachEntry : eachMap.entrySet()) {
-
- String node = eachEntry.getKey();
- List<TableBlockInfo> blocks = eachEntry.getValue();
-
- // if already that node detail exist in the combined map.
- if (null != combinedMap.get(node)) {
- List<TableBlockInfo> blocksAlreadyPresent = combinedMap.get(node);
- blocksAlreadyPresent.addAll(blocks);
- } else { // if its not present in map then put to map.
- combinedMap.put(node, blocks);
- }
- }
- }
- return combinedMap;
- }
-
- /**
- * Removing the already merged segments from list.
- */
- public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
- List<LoadMetadataDetails> segments,
- LoadMetadataDetails lastSeg) {
-
- // take complete list of segments.
- List<LoadMetadataDetails> list = new ArrayList<>(segments);
- // sort list
- CarbonDataMergerUtil.sortSegments(list);
-
- // first filter out newly added segments.
- return list.subList(0, list.indexOf(lastSeg) + 1);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
deleted file mode 100644
index 93ba2a3..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-
-/**
- * Block to Node mapping
- */
-public class NodeBlockRelation implements Comparable<NodeBlockRelation> {
-
- private final Distributable block;
- private final String node;
-
- public NodeBlockRelation(Distributable block, String node) {
- this.block = block;
- this.node = node;
-
- }
-
- public Distributable getBlock() {
- return block;
- }
-
- public String getNode() {
- return node;
- }
-
- @Override public int compareTo(NodeBlockRelation obj) {
- return this.getNode().compareTo(obj.getNode());
- }
-
- @Override public boolean equals(Object obj) {
- if (!(obj instanceof NodeBlockRelation)) {
- return false;
- }
- NodeBlockRelation o = (NodeBlockRelation) obj;
- return node.equals(o.node);
- }
-
- @Override public int hashCode() {
- return node.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
deleted file mode 100644
index 6b4d1bc..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-
-public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> {
-
- private final List<Distributable> blocks;
- private final String node;
-
- public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
- this.node = node;
- this.blocks = blocks;
-
- }
-
- public List<Distributable> getBlocks() {
- return blocks;
- }
-
- public String getNode() {
- return node;
- }
-
- @Override public int compareTo(NodeMultiBlockRelation obj) {
- return this.blocks.size() - obj.getBlocks().size();
- }
-
- @Override public boolean equals(Object obj) {
- if (!(obj instanceof NodeMultiBlockRelation)) {
- return false;
- }
- NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj;
- return blocks.equals(o.blocks) && node.equals(o.node);
- }
-
- @Override public int hashCode() {
- return blocks.hashCode() + node.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
deleted file mode 100644
index 58f3a2d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api;
-
-import java.util.List;
-
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-
-import org.apache.spark.sql.execution.command.Partitioner;
-
-public interface DataPartitioner {
- /**
- * Initialise the partitioner based on the given columns
- */
- void initialize(String basePath, String[] columns, Partitioner partitioner);
-
- /**
- * All the partitions built by the Partitioner
- */
- List<Partition> getAllPartitions();
-
- /**
- * Partition where the tuple should be present. (API used for data loading purpose)
- */
- Partition getPartionForTuple(Object[] tuple, long rowCounter);
-
- /**
- * Identifies the partitions applicable for the given filter (API used for For query)
- */
- List<Partition> getPartitions(CarbonQueryPlan queryPlan);
-
- String[] getPartitionedColumns();
-
- Partitioner getPartitioner();
-
-}
-