You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:50 UTC
[12/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
new file mode 100644
index 0000000..836a757
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.merger;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.lcm.locks.ICarbonLock;
+import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.spark.load.CarbonLoaderUtil;
+
+/**
+ * utility class for load merging.
+ */
+public final class CarbonDataMergerUtil {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName());
+
+ private CarbonDataMergerUtil() {
+
+ }
+
+ /**
+ * Returns the size of all the carbondata files present in the segment.
+ * @param carbonFile
+ * @return
+ */
+ private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
+ long factSize = 0;
+
+ // carbon data file case.
+ CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonDataFile(file.getName());
+ }
+ });
+
+ for (CarbonFile fact : factFile) {
+ factSize += fact.getSize();
+ }
+
+ return factSize;
+ }
+
+ /**
+ * To check whether the merge property is enabled or not.
+ *
+ * @return
+ */
+
+ public static boolean checkIfAutoLoadMergingRequired() {
+ // load merge is not supported as per new store format
+ // moving the load merge check in early to avoid unnecessary load listing causing IOException
+ // check whether carbons segment merging operation is enabled or not.
+ // default will be false.
+ String isLoadMergeEnabled = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+ CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
+ if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Form the Name of the New Merge Folder
+ *
+ * @param segmentsToBeMergedList
+ * @return
+ */
+ public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) {
+ String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName();
+ if (firstSegmentName.contains(".")) {
+ String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
+ String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
+ int fraction = Integer.parseInt(afterDecimal) + 1;
+ String mergedSegmentName = beforeDecimal + "." + fraction;
+ return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName;
+ } else {
+ String mergeName = firstSegmentName + "." + 1;
+ return CarbonCommonConstants.LOAD_FOLDER + mergeName;
+ }
+
+ }
+
+ public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
+ String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
+ String mergeLoadStartTime, CompactionType compactionType) {
+
+ boolean tableStatusUpdationStatus = false;
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+ ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
+
+ try {
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+ + carbonLoadModel.getTableName() + " for table status updation ");
+
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
+
+ String statusFilePath = carbonTablePath.getTableStatusFilePath();
+
+ LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+ String mergedLoadNumber = MergedLoadName.substring(
+ MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
+ + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
+
+ String modificationOrDeletionTimeStamp = CarbonLoaderUtil.readCurrentTime();
+ for (LoadMetadataDetails loadDetail : loadDetails) {
+ // check if this segment is merged.
+ if (loadsToMerge.contains(loadDetail)) {
+ // if the compacted load is deleted after the start of the compaction process,
+ // then need to discard the compaction process and treat it as failed compaction.
+ if (loadDetail.getLoadStatus()
+ .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+ LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
+ + " is deleted after the compaction is started.");
+ return tableStatusUpdationStatus;
+ }
+ loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
+ loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
+ loadDetail.setMergedLoadName(mergedLoadNumber);
+ }
+ }
+
+ // create entry for merged one.
+ LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+ loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
+ String loadEnddate = CarbonLoaderUtil.readCurrentTime();
+ loadMetadataDetails.setTimestamp(loadEnddate);
+ loadMetadataDetails.setLoadName(mergedLoadNumber);
+ loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
+ loadMetadataDetails.setPartitionCount("0");
+ // if this is a major compaction then set the segment as major compaction.
+ if (compactionType == CompactionType.MAJOR_COMPACTION) {
+ loadMetadataDetails.setMajorCompacted("true");
+ }
+
+ List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
+
+ // put the merged folder entry
+ updatedDetailsList.add(loadMetadataDetails);
+
+ try {
+ SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+ updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
+ tableStatusUpdationStatus = true;
+ } catch (IOException e) {
+ LOGGER.error("Error while writing metadata");
+ }
+ } else {
+ LOGGER.error(
+ "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
+ + carbonLoadModel.getTableName() + "for table status updation");
+ }
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
+ .getDatabaseName() + "." + carbonLoadModel.getTableName());
+ } else {
+ LOGGER.error(
+ "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
+ + carbonLoadModel.getTableName() + " during table status updation");
+ }
+ }
+ return tableStatusUpdationStatus;
+ }
+
+ /**
+ * To identify which all segments can be merged.
+ *
+ * @param storeLocation
+ * @param carbonLoadModel
+ * @param compactionSize
+ * @return
+ */
+ public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
+ CarbonLoadModel carbonLoadModel, long compactionSize,
+ List<LoadMetadataDetails> segments, CompactionType compactionType) {
+
+ List sortedSegments = new ArrayList(segments);
+
+ sortSegments(sortedSegments);
+
+ // check preserve property and preserve the configured number of latest loads.
+
+ List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
+ checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
+
+ // filter the segments if the compaction based on days is configured.
+
+ List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
+ identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
+ List<LoadMetadataDetails> listOfSegmentsToBeMerged;
+ // identify the segments to merge based on the Size of the segments across partition.
+ if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
+
+ listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
+ listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
+ } else {
+
+ listOfSegmentsToBeMerged =
+ identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
+ }
+
+ return listOfSegmentsToBeMerged;
+ }
+
+ /**
+ * Sorting of the segments.
+ * @param segments
+ */
+ public static void sortSegments(List segments) {
+ // sort the segment details.
+ Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
+ @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+ double seg1Id = Double.parseDouble(seg1.getLoadName());
+ double seg2Id = Double.parseDouble(seg2.getLoadName());
+ if (seg1Id - seg2Id < 0) {
+ return -1;
+ }
+ if (seg1Id - seg2Id > 0) {
+ return 1;
+ }
+ return 0;
+ }
+ });
+ }
+
+ /**
+ * This method will return the list of loads which are loaded at the same interval.
+ * This property is configurable.
+ *
+ * @param listOfSegmentsBelowThresholdSize
+ * @return
+ */
+ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate(
+ List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) {
+
+ List<LoadMetadataDetails> loadsOfSameDate =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ long numberOfDaysAllowedToMerge = 0;
+ try {
+ numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
+ if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) {
+ LOGGER.error(
+ "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT
+ + " is incorrect."
+ + " Correct value should be in range of 0 -100. Taking the default value.");
+ numberOfDaysAllowedToMerge =
+ Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
+ }
+
+ } catch (NumberFormatException e) {
+ numberOfDaysAllowedToMerge =
+ Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
+ }
+ // if true then process loads according to the load date.
+ if (numberOfDaysAllowedToMerge > 0) {
+
+ // filter loads based on the loaded date
+ boolean first = true;
+ Date segDate1 = null;
+ SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+ for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
+
+ if (first) {
+ segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
+ first = false;
+ continue;
+ }
+ String segmentDate = segment.getLoadStartTime();
+ Date segDate2 = null;
+ try {
+ segDate2 = sdf.parse(segmentDate);
+ } catch (ParseException e) {
+ LOGGER.error("Error while parsing segment start time" + e.getMessage());
+ }
+
+ if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) {
+ loadsOfSameDate.add(segment);
+ }
+ // if the load is beyond merged date.
+ // then reset everything and continue search for loads.
+ else if (loadsOfSameDate.size() < 2) {
+ loadsOfSameDate.clear();
+ // need to add the next segment as first and to check further
+ segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
+ } else { // case where a load is beyond merge date and there is at least 2 loads to merge.
+ break;
+ }
+ }
+ } else {
+ return listOfSegmentsBelowThresholdSize;
+ }
+
+ return loadsOfSameDate;
+ }
+
+ /**
+ * @param loadsOfSameDate
+ * @param segment
+ * @return
+ */
+ private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate,
+ LoadMetadataDetails segment, SimpleDateFormat sdf) {
+ String baselineLoadStartTime = segment.getLoadStartTime();
+ Date segDate1 = null;
+ try {
+ segDate1 = sdf.parse(baselineLoadStartTime);
+ } catch (ParseException e) {
+ LOGGER.error("Error while parsing segment start time" + e.getMessage());
+ }
+ loadsOfSameDate.add(segment);
+ return segDate1;
+ }
+
+ /**
+ * Method to check if the load dates are complied to the configured dates.
+ *
+ * @param segDate1
+ * @param segDate2
+ * @return
+ */
+ private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2,
+ long numberOfDaysAllowedToMerge) {
+ if(segDate1 == null || segDate2 == null) {
+ return false;
+ }
+ // take 1 st date add the configured days .
+ Calendar cal1 = Calendar.getInstance();
+ cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate());
+ Calendar cal2 = Calendar.getInstance();
+ cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate());
+
+ long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis();
+
+ if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Identify the segments to be merged based on the Size in case of Major compaction.
+ *
+ * @param compactionSize
+ * @param listOfSegmentsAfterPreserve
+ * @param carbonLoadModel
+ * @param storeLocation
+ * @return
+ */
+ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
+ long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
+ CarbonLoadModel carbonLoadModel, String storeLocation) {
+
+ List<LoadMetadataDetails> segmentsToBeMerged =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ CarbonTableIdentifier tableIdentifier =
+ carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
+
+
+ // total length
+ long totalLength = 0;
+
+ // check size of each segment , sum it up across partitions
+ for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+ String segId = segment.getLoadName();
+ // variable to store one segment size across partition.
+ long sizeOfOneSegmentAcrossPartition =
+ getSizeOfSegment(storeLocation, tableIdentifier, segId);
+
+ // if size of a segment is greater than the Major compaction size. then ignore it.
+ if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
+ // if already 2 segments have been found for merging then stop scan here and merge.
+ if (segmentsToBeMerged.size() > 1) {
+ break;
+ } else { // if only one segment is found then remove the earlier one in list.
+ // reset the total length to 0.
+ segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ totalLength = 0;
+ continue;
+ }
+ }
+
+ totalLength += sizeOfOneSegmentAcrossPartition;
+
+ // in case of major compaction the size doesnt matter. all the segments will be merged.
+ if (totalLength < (compactionSize * 1024 * 1024)) {
+ segmentsToBeMerged.add(segment);
+ } else { // if already 2 segments have been found for merging then stop scan here and merge.
+ if (segmentsToBeMerged.size() > 1) {
+ break;
+ } else { // if only one segment is found then remove the earlier one in list and put this.
+ // reset the total length to the current identified segment.
+ segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ segmentsToBeMerged.add(segment);
+ totalLength = sizeOfOneSegmentAcrossPartition;
+ }
+ }
+
+ }
+
+ return segmentsToBeMerged;
+ }
+
+ /**
+ * For calculating the size of the specified segment
+ * @param storeLocation
+ * @param tableIdentifier
+ * @param segId
+ * @return
+ */
+ private static long getSizeOfSegment(String storeLocation,
+ CarbonTableIdentifier tableIdentifier, String segId) {
+ String loadPath = CarbonLoaderUtil
+ .getStoreLocation(storeLocation, tableIdentifier, segId, "0");
+ CarbonFile segmentFolder =
+ FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
+ return getSizeOfFactFileInLoad(segmentFolder);
+ }
+
+ /**
+ * Identify the segments to be merged based on the segment count
+ *
+ * @param listOfSegmentsAfterPreserve
+ * @return
+ */
+ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
+ List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
+
+ List<LoadMetadataDetails> mergedSegments =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<LoadMetadataDetails> unMergedSegments =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ int[] noOfSegmentLevelsCount =
+ CarbonProperties.getInstance().getCompactionSegmentLevelCount();
+
+ int level1Size = 0;
+ int level2Size = 0;
+ int size = noOfSegmentLevelsCount.length;
+
+ if (size >= 2) {
+ level1Size = noOfSegmentLevelsCount[0];
+ level2Size = noOfSegmentLevelsCount[1];
+ } else if (size == 1) {
+ level1Size = noOfSegmentLevelsCount[0];
+ }
+
+ int unMergeCounter = 0;
+ int mergeCounter = 0;
+
+ // check size of each segment , sum it up across partitions
+ for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+ String segName = segment.getLoadName();
+
+ // if a segment is already merged 2 levels then it s name will become .2
+ // need to exclude those segments from minor compaction.
+ // if a segment is major compacted then should not be considered for minor.
+ if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
+ segment.isMajorCompacted() != null && segment.isMajorCompacted()
+ .equalsIgnoreCase("true"))) {
+ continue;
+ }
+
+ // check if the segment is merged or not
+
+ if (!isMergedSegment(segName)) {
+ //if it is an unmerged segment then increment counter
+ unMergeCounter++;
+ unMergedSegments.add(segment);
+ if (unMergeCounter == (level1Size)) {
+ return unMergedSegments;
+ }
+ } else {
+ mergeCounter++;
+ mergedSegments.add(segment);
+ if (mergeCounter == (level2Size)) {
+ return mergedSegments;
+ }
+ }
+ }
+ return new ArrayList<>(0);
+ }
+
+ /**
+ * To check if the segment is merged or not.
+ * @param segName
+ * @return
+ */
+ private static boolean isMergedSegment(String segName) {
+ if(segName.contains(".")){
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * checks number of loads to be preserved and returns remaining valid segments
+ *
+ * @param segments
+ * @return
+ */
+ private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining(
+ List<LoadMetadataDetails> segments) {
+
+ int numberOfSegmentsToBePreserved = 0;
+ // check whether the preserving of the segments from merging is enabled or not.
+ // get the number of loads to be preserved.
+ numberOfSegmentsToBePreserved =
+ CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
+ // get the number of valid segments and retain the latest loads from merging.
+ return CarbonDataMergerUtil
+ .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved);
+ }
+
+ /**
+ * Retain the number of segments which are to be preserved and return the remaining list of
+ * segments.
+ *
+ * @param loadMetadataDetails
+ * @param numberOfSegToBeRetained
+ * @return
+ */
+ private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining(
+ List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) {
+
+ List<LoadMetadataDetails> validList =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (LoadMetadataDetails segment : loadMetadataDetails) {
+ if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ || segment.getLoadStatus()
+ .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || segment
+ .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE)) {
+ validList.add(segment);
+ }
+ }
+
+ // check if valid list is big enough for removing the number of seg to be retained.
+ // last element
+ int removingIndex = validList.size() - 1;
+
+ for (int i = validList.size(); i > 0; i--) {
+ if (numberOfSegToBeRetained == 0) {
+ break;
+ }
+ // remove last segment
+ validList.remove(removingIndex--);
+ numberOfSegToBeRetained--;
+ }
+ return validList;
+
+ }
+
+ /**
+ * This will give the compaction sizes configured based on compaction type.
+ *
+ * @param compactionType
+ * @return
+ */
+ public static long getCompactionSize(CompactionType compactionType) {
+
+ long compactionSize = 0;
+ switch (compactionType) {
+ case MAJOR_COMPACTION:
+ compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
+ break;
+ default: // this case can not come.
+ }
+ return compactionSize;
+ }
+
+ /**
+ * For getting the comma separated valid segments for merging.
+ *
+ * @param loadMetadataDetails
+ * @return
+ */
+ public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
+ StringBuilder builder = new StringBuilder();
+ for (LoadMetadataDetails segment : loadMetadataDetails) {
+ //check if this load is an already merged load.
+ if (null != segment.getMergedLoadName()) {
+ builder.append(segment.getMergedLoadName() + ",");
+ } else {
+ builder.append(segment.getLoadName() + ",");
+ }
+ }
+ builder.deleteCharAt(builder.length() - 1);
+ return builder.toString();
+ }
+
+ /**
+ * Combining the list of maps to one map.
+ *
+ * @param mapsOfNodeBlockMapping
+ * @return
+ */
+ public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps(
+ List<Map<String, List<TableBlockInfo>>> mapsOfNodeBlockMapping) {
+
+ Map<String, List<TableBlockInfo>> combinedMap =
+ new HashMap<String, List<TableBlockInfo>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ // traverse list of maps.
+ for (Map<String, List<TableBlockInfo>> eachMap : mapsOfNodeBlockMapping) {
+ // traverse inside each map.
+ for (Map.Entry<String, List<TableBlockInfo>> eachEntry : eachMap.entrySet()) {
+
+ String node = eachEntry.getKey();
+ List<TableBlockInfo> blocks = eachEntry.getValue();
+
+ // if already that node detail exist in the combined map.
+ if (null != combinedMap.get(node)) {
+ List<TableBlockInfo> blocksAlreadyPresent = combinedMap.get(node);
+ blocksAlreadyPresent.addAll(blocks);
+ } else { // if its not present in map then put to map.
+ combinedMap.put(node, blocks);
+ }
+ }
+ }
+ return combinedMap;
+ }
+
+ /**
+ * Removing the already merged segments from list.
+ */
+ public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
+ List<LoadMetadataDetails> segments,
+ LoadMetadataDetails lastSeg) {
+
+ // take complete list of segments.
+ List<LoadMetadataDetails> list = new ArrayList<>(segments);
+ // sort list
+ CarbonDataMergerUtil.sortSegments(list);
+
+ // first filter out newly added segments.
+ return list.subList(0, list.indexOf(lastSeg) + 1);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
new file mode 100644
index 0000000..d1cb9d9
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.merger;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.spark.rdd.Compactor;
+
+import org.apache.spark.sql.execution.command.CompactionCallableModel;
+
+/**
+ * Callable class which is used to trigger the compaction in a separate callable.
+ */
+public class CompactionCallable implements Callable<Void> {
+
+ private final CompactionCallableModel compactionCallableModel;
+
+ public CompactionCallable(CompactionCallableModel compactionCallableModel) {
+
+ this.compactionCallableModel = compactionCallableModel;
+ }
+
+ @Override public Void call() throws Exception {
+
+ Compactor.triggerCompaction(compactionCallableModel);
+ return null;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
new file mode 100644
index 0000000..d735b44
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+/**
+ * This enum is used to define the types of Compaction.
+ * We have 2 types. one is Minor and another is Major
+ */
+public enum CompactionType {
+ MINOR_COMPACTION,
+ MAJOR_COMPACTION
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
new file mode 100644
index 0000000..93ba2a3
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+
+/**
+ * Block to Node mapping
+ */
+public class NodeBlockRelation implements Comparable<NodeBlockRelation> {
+
+ private final Distributable block;
+ private final String node;
+
+ public NodeBlockRelation(Distributable block, String node) {
+ this.block = block;
+ this.node = node;
+
+ }
+
+ public Distributable getBlock() {
+ return block;
+ }
+
+ public String getNode() {
+ return node;
+ }
+
+ @Override public int compareTo(NodeBlockRelation obj) {
+ return this.getNode().compareTo(obj.getNode());
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof NodeBlockRelation)) {
+ return false;
+ }
+ NodeBlockRelation o = (NodeBlockRelation) obj;
+ return node.equals(o.node);
+ }
+
+ @Override public int hashCode() {
+ return node.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
new file mode 100644
index 0000000..6b4d1bc
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+
+public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> {
+
+ private final List<Distributable> blocks;
+ private final String node;
+
+ public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
+ this.node = node;
+ this.blocks = blocks;
+
+ }
+
+ public List<Distributable> getBlocks() {
+ return blocks;
+ }
+
+ public String getNode() {
+ return node;
+ }
+
+ @Override public int compareTo(NodeMultiBlockRelation obj) {
+ return this.blocks.size() - obj.getBlocks().size();
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof NodeMultiBlockRelation)) {
+ return false;
+ }
+ NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj;
+ return blocks.equals(o.blocks) && node.equals(o.node);
+ }
+
+ @Override public int hashCode() {
+ return blocks.hashCode() + node.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
new file mode 100644
index 0000000..44c05a1
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.io.File;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * This is the Merger class responsible for the merging of the segments.
+ */
+public class RowResultMerger {
+
+ private final String databaseName;
+ private final String tableName;
+ private final String tempStoreLocation;
+ private final int measureCount;
+ private final String factStoreLocation;
+ private CarbonFactHandler dataHandler;
+ private List<RawResultIterator> rawResultIteratorList =
+ new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ private SegmentProperties segprop;
+ /**
+ * record holder heap
+ */
+ private AbstractQueue<RawResultIterator> recordHolderHeap;
+
+ private TupleConversionAdapter tupleConvertor;
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RowResultMerger.class.getName());
+
+ public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
+ String tableName, SegmentProperties segProp, String tempStoreLocation,
+ CarbonLoadModel loadModel, int[] colCardinality) {
+
+ this.rawResultIteratorList = iteratorList;
+ // create the List of RawResultIterator.
+
+ recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
+ new RowResultMerger.CarbonMdkeyComparator());
+
+ this.segprop = segProp;
+ this.tempStoreLocation = tempStoreLocation;
+
+ this.factStoreLocation = loadModel.getStorePath();
+
+ if (!new File(tempStoreLocation).mkdirs()) {
+ LOGGER.error("Error while new File(tempStoreLocation).mkdirs() ");
+ }
+
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+
+ this.measureCount = segprop.getMeasures().size();
+ CarbonTable carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+ CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+ getCarbonFactDataHandlerModel(loadModel);
+ carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality());
+ CarbonDataFileAttributes carbonDataFileAttributes =
+ new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+ loadModel.getFactTimeStamp());
+ carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+ if (segProp.getNumberOfNoDictionaryDimension() > 0
+ || segProp.getComplexDimensions().size() > 0) {
+ carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
+ } else {
+ carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
+ }
+ carbonFactDataHandlerModel.setColCardinality(colCardinality);
+ carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+ dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+
+ tupleConvertor = new TupleConversionAdapter(segProp);
+ }
+
+ /**
+ * Merge function
+ *
+ */
+ public boolean mergerSlice() {
+ boolean mergeStatus = false;
+ int index = 0;
+ try {
+
+ dataHandler.initialise();
+
+ // add all iterators to the queue
+ for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) {
+ this.recordHolderHeap.add(leaftTupleIterator);
+ index++;
+ }
+ RawResultIterator iterator = null;
+ while (index > 1) {
+ // iterator the top record
+ iterator = this.recordHolderHeap.poll();
+ Object[] convertedRow = iterator.next();
+ if(null == convertedRow){
+ throw new SliceMergerException("Unable to generate mdkey during compaction.");
+ }
+ // get the mdkey
+ addRow(convertedRow);
+ // if there is no record in the leaf and all then decrement the
+ // index
+ if (!iterator.hasNext()) {
+ index--;
+ continue;
+ }
+ // add record to heap
+ this.recordHolderHeap.add(iterator);
+ }
+ // if record holder is not empty then iterator the slice holder from
+ // heap
+ iterator = this.recordHolderHeap.poll();
+ while (true) {
+ Object[] convertedRow = iterator.next();
+ if(null == convertedRow){
+ throw new SliceMergerException("Unable to generate mdkey during compaction.");
+ }
+ addRow(convertedRow);
+ // check if leaf contains no record
+ if (!iterator.hasNext()) {
+ break;
+ }
+ }
+ this.dataHandler.finish();
+ mergeStatus = true;
+ } catch (Exception e) {
+ LOGGER.error("Exception in compaction merger " + e.getMessage());
+ mergeStatus = false;
+ } finally {
+ try {
+ this.dataHandler.closeHandler();
+ } catch (CarbonDataWriterException e) {
+ LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
+ mergeStatus = false;
+ }
+ }
+
+ return mergeStatus;
+ }
+
+ /**
+ * Below method will be used to add sorted row
+ *
+ * @throws SliceMergerException
+ */
+ protected void addRow(Object[] carbonTuple) throws SliceMergerException {
+ Object[] rowInWritableFormat;
+
+ rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
+ try {
+ this.dataHandler.addDataToStore(rowInWritableFormat);
+ } catch (CarbonDataWriterException e) {
+ throw new SliceMergerException("Problem in merging the slice", e);
+ }
+ }
+
+ /**
+ * This method will create a model object for carbon fact data handler
+ *
+ * @param loadModel
+ * @return
+ */
+ private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel) {
+ CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
+ carbonFactDataHandlerModel.setDatabaseName(databaseName);
+ carbonFactDataHandlerModel.setTableName(tableName);
+ carbonFactDataHandlerModel.setMeasureCount(segprop.getMeasures().size());
+ carbonFactDataHandlerModel.setCompactionFlow(true);
+ carbonFactDataHandlerModel
+ .setMdKeyLength(segprop.getDimensionKeyGenerator().getKeySizeInBytes());
+ carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
+ carbonFactDataHandlerModel.setDimLens(segprop.getDimColumnsCardinality());
+ carbonFactDataHandlerModel.setSegmentProperties(segprop);
+ carbonFactDataHandlerModel.setNoDictionaryCount(segprop.getNumberOfNoDictionaryDimension());
+ carbonFactDataHandlerModel.setDimensionCount(
+ segprop.getDimensions().size() - carbonFactDataHandlerModel.getNoDictionaryCount());
+ CarbonTable carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+ List<ColumnSchema> wrapperColumnSchema = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
+ carbonTable.getMeasureByTableName(tableName));
+ carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
+ //TO-DO Need to handle complex types here .
+ Map<Integer, GenericDataType> complexIndexMap =
+ new HashMap<Integer, GenericDataType>(segprop.getComplexDimensions().size());
+ carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
+ carbonFactDataHandlerModel.setDataWritingRequest(true);
+
+ char[] aggType = new char[segprop.getMeasures().size()];
+ Arrays.fill(aggType, 'n');
+ int i = 0;
+ for (CarbonMeasure msr : segprop.getMeasures()) {
+ aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
+ }
+ carbonFactDataHandlerModel.setAggType(aggType);
+ carbonFactDataHandlerModel.setFactDimLens(segprop.getDimColumnsCardinality());
+
+ String carbonDataDirectoryPath =
+ checkAndCreateCarbonStoreLocation(this.factStoreLocation, databaseName, tableName,
+ loadModel.getPartitionId(), loadModel.getSegmentId());
+ carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
+
+ List<CarbonDimension> dimensionByTableName =
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName);
+ boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
+ int index = 0;
+ for (CarbonDimension dimension : dimensionByTableName) {
+ isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex();
+ }
+ carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
+ return carbonFactDataHandlerModel;
+ }
+
+ /**
+ * This method will get the store location for the given path, segment id and partition id
+ *
+ * @return data directory path
+ */
+ private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName,
+ String tableName, String partitionId, String segmentId) {
+ String carbonStorePath = factStoreLocation;
+ CarbonTable carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+ CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
+ String carbonDataDirectoryPath =
+ carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
+ CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+ return carbonDataDirectoryPath;
+ }
+
+ /**
+ * Comparator class for comparing 2 raw row result.
+ */
+ private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
+
+ @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
+
+ Object[] row1 = new Object[0];
+ Object[] row2 = new Object[0];
+ try {
+ row1 = o1.fetchConverted();
+ row2 = o2.fetchConverted();
+ } catch (KeyGenException e) {
+ LOGGER.error(e.getMessage());
+ }
+ if (null == row1 || null == row2) {
+ return 0;
+ }
+ ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
+ ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
+ int compareResult = 0;
+ int[] columnValueSizes = segprop.getEachDimColumnValueSize();
+ int dictionaryKeyOffset = 0;
+ byte[] dimCols1 = key1.getDictionaryKey();
+ byte[] dimCols2 = key2.getDictionaryKey();
+ int noDicIndex = 0;
+ for (int eachColumnValueSize : columnValueSizes) {
+ // case of dictionary cols
+ if (eachColumnValueSize > 0) {
+
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
+ dictionaryKeyOffset, eachColumnValueSize);
+ dictionaryKeyOffset += eachColumnValueSize;
+ } else { // case of no dictionary
+
+ byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
+ byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
+ compareResult =
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
+ noDicIndex++;
+
+ }
+ if (0 != compareResult) {
+ return compareResult;
+ }
+ }
+ return 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
new file mode 100644
index 0000000..68cb6e9
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.merger;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+
+public class TableMeta implements Serializable {
+
+ private static final long serialVersionUID = -1749874611119829431L;
+
+ public CarbonTableIdentifier carbonTableIdentifier;
+ public String storePath;
+ public CarbonTable carbonTable;
+
+ public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath,
+ CarbonTable carbonTable) {
+ this.carbonTableIdentifier = carbonTableIdentifier;
+ this.storePath = storePath;
+ this.carbonTable = carbonTable;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
new file mode 100644
index 0000000..94ebf40
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * This class will be used to convert the Result into the format used in data writer.
+ */
+public class TupleConversionAdapter {
+
+ private final SegmentProperties segmentproperties;
+
+ private final List<CarbonMeasure> measureList;
+
+ private int noDictionaryPresentIndex;
+
+ private int measureCount;
+
+ private boolean isNoDictionaryPresent;
+
+ public TupleConversionAdapter(SegmentProperties segmentProperties) {
+ this.measureCount = segmentProperties.getMeasures().size();
+ this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0;
+ if (isNoDictionaryPresent) {
+ noDictionaryPresentIndex++;
+ }
+ this.segmentproperties = segmentProperties;
+ measureList = segmentProperties.getMeasures();
+ }
+
+ /**
+ * Converting the raw result to the format understandable by the data writer.
+ * @param carbonTuple
+ * @return
+ */
+ public Object[] getObjectArray(Object[] carbonTuple) {
+ Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
+ int index = 0;
+ // put measures.
+
+ for (int j = 1; j <= measureCount; j++) {
+ row[index++] = carbonTuple[j];
+ }
+
+ // put No dictionary byte []
+ if (isNoDictionaryPresent) {
+
+ int noDicCount = segmentproperties.getNumberOfNoDictionaryDimension();
+ List<byte[]> noDicByteArr = new ArrayList<>(noDicCount);
+ for (int i = 0; i < noDicCount; i++) {
+ noDicByteArr.add(((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeyByIndex(i));
+ }
+ byte[] singleByteArr = RemoveDictionaryUtil.convertListByteArrToSingleArr(noDicByteArr);
+
+ row[index++] = singleByteArr;
+ }
+
+ // put No Dictionary Dims
+ row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
new file mode 100644
index 0000000..58f3a2d
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api;
+
+import java.util.List;
+
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+
+import org.apache.spark.sql.execution.command.Partitioner;
+
+public interface DataPartitioner {
+ /**
+ * Initialise the partitioner based on the given columns
+ */
+ void initialize(String basePath, String[] columns, Partitioner partitioner);
+
+ /**
+ * All the partitions built by the Partitioner
+ */
+ List<Partition> getAllPartitions();
+
+ /**
+ * Partition where the tuple should be present. (API used for data loading purpose)
+ */
+ Partition getPartionForTuple(Object[] tuple, long rowCounter);
+
+ /**
+ * Identifies the partitions applicable for the given filter (API used for For query)
+ */
+ List<Partition> getPartitions(CarbonQueryPlan queryPlan);
+
+ String[] getPartitionedColumns();
+
+ Partitioner getPartitioner();
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
new file mode 100644
index 0000000..61639d3
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Partition extends Serializable {
+ /**
+ * unique identification for the partition in the cluster.
+ */
+ String getUniqueID();
+
+ /**
+ * File path for the raw data represented by this partition
+ */
+ String getFilePath();
+
+ /**
+ * result
+ *
+ * @return
+ */
+ List<String> getFilesPath();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
new file mode 100644
index 0000000..bc6e54f
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+public final class DataPartitionerProperties {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataPartitionerProperties.class.getName());
+
+ private static DataPartitionerProperties instance;
+
+ private Properties properties;
+
+ private DataPartitionerProperties() {
+ properties = loadProperties();
+ }
+
+ public static DataPartitionerProperties getInstance() {
+ if (instance == null) {
+ instance = new DataPartitionerProperties();
+ }
+ return instance;
+ }
+
+ public String getValue(String key, String defaultVal) {
+ return properties.getProperty(key, defaultVal);
+ }
+
+ public String getValue(String key) {
+ return properties.getProperty(key);
+ }
+
+ /**
+ * Read the properties from CSVFilePartitioner.properties
+ */
+ private Properties loadProperties() {
+ Properties props = new Properties();
+
+ File file = new File("DataPartitioner.properties");
+ FileInputStream fis = null;
+ try {
+ if (file.exists()) {
+ fis = new FileInputStream(file);
+
+ props.load(fis);
+ }
+ } catch (Exception e) {
+ LOGGER
+ .error(e, e.getMessage());
+ } finally {
+ if (null != fis) {
+ try {
+ fis.close();
+ } catch (IOException e) {
+ LOGGER.error(e,
+ e.getMessage());
+ }
+ }
+ }
+
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
new file mode 100644
index 0000000..9bee8a2
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+/**
+ * A sample load balancer to distribute the partitions to the available nodes in a round robin mode.
+ */
+public class DefaultLoadBalancer {
+ private Map<String, List<Partition>> nodeToPartitonMap =
+ new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ private Map<Partition, String> partitonToNodeMap =
+ new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) {
+ //Per form a round robin allocation
+ int nodeCount = nodes.size();
+
+ int partitioner = 0;
+ for (Partition partition : partitions) {
+ int nodeindex = partitioner % nodeCount;
+ String node = nodes.get(nodeindex);
+
+ List<Partition> oldList = nodeToPartitonMap.get(node);
+ if (oldList == null) {
+ oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ nodeToPartitonMap.put(node, oldList);
+ }
+ oldList.add(partition);
+
+ partitonToNodeMap.put(partition, node);
+
+ partitioner++;
+ }
+ }
+
+ public List<Partition> getPartitionsForNode(String node) {
+ return nodeToPartitonMap.get(node);
+ }
+
+ public String getNodeForPartitions(Partition partition) {
+ return partitonToNodeMap.get(partition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
new file mode 100644
index 0000000..bd7cc42
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.spark.partition.api.Partition;
+
+public class PartitionImpl implements Partition {
+ private static final long serialVersionUID = 3020172346383028547L;
+ private String uniqueID;
+ private String folderPath;
+
+
+ public PartitionImpl(String uniqueID, String folderPath) {
+ this.uniqueID = uniqueID;
+ this.folderPath = folderPath;
+ }
+
+ @Override public String getUniqueID() {
+ return uniqueID;
+ }
+
+ @Override public String getFilePath() {
+ return folderPath;
+ }
+
+ @Override public String toString() {
+ return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}';
+ }
+
+ @Override public List<String> getFilesPath() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
new file mode 100644
index 0000000..de32b5c
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.spark.partition.api.Partition;
+
+public class PartitionMultiFileImpl implements Partition {
+ private static final long serialVersionUID = -4363447826181193976L;
+ private String uniqueID;
+ private List<String> folderPath;
+
+ public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) {
+ this.uniqueID = uniqueID;
+ this.folderPath = folderPath;
+ }
+
+ @Override public String getUniqueID() {
+ // TODO Auto-generated method stub
+ return uniqueID;
+ }
+
+ @Override public String getFilePath() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override public List<String> getFilesPath() {
+ // TODO Auto-generated method stub
+ return folderPath;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
new file mode 100644
index 0000000..e05be7d
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.DataPartitioner;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+
+public final class QueryPartitionHelper {
+ private static QueryPartitionHelper instance = new QueryPartitionHelper();
+ private Map<String, DataPartitioner> partitionerMap =
+ new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ private Map<String, DefaultLoadBalancer> loadBalancerMap =
+ new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ private QueryPartitionHelper() {
+
+ }
+
+ public static QueryPartitionHelper getInstance() {
+ return instance;
+ }
+
+ /**
+ * Get partitions applicable for query based on filters applied in query
+ */
+ public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
+ String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
+
+ DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+ List<Partition> queryPartitions = dataPartitioner.getPartitions(queryPlan);
+ return queryPartitions;
+ }
+
+ public List<Partition> getAllPartitions(String databaseName, String tableName) {
+ String tableUniqueName = databaseName + '_' + tableName;
+
+ DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+ return dataPartitioner.getAllPartitions();
+ }
+
+ /**
+ * Get the node name where the partition is assigned to.
+ */
+ public String getLocation(Partition partition, String databaseName, String tableName) {
+ String tableUniqueName = databaseName + '_' + tableName;
+
+ DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
+ return loadBalancer.getNodeForPartitions(partition);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
new file mode 100644
index 0000000..c9b434a
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.DataPartitioner;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+import org.apache.spark.sql.execution.command.Partitioner;
+
+/**
+ * Sample partition.
+ */
+public class SampleDataPartitionerImpl implements DataPartitioner {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SampleDataPartitionerImpl.class.getName());
+ private int numberOfPartitions = 1;
+
+ private int partionColumnIndex = -1;
+
+ private String partitionColumn;
+
+ private Partitioner partitioner;
+ private List<Partition> allPartitions;
+ private String baseLocation;
+
+ public SampleDataPartitionerImpl() {
+ }
+
+ public void initialize(String basePath, String[] columns, Partitioner partitioner) {
+ this.partitioner = partitioner;
+ numberOfPartitions = partitioner.partitionCount();
+
+ partitionColumn = partitioner.partitionColumn()[0];
+ LOGGER.info("SampleDataPartitionerImpl initializing with following properties.");
+ LOGGER.info("partitionCount: " + numberOfPartitions);
+ LOGGER.info("partitionColumn: " + partitionColumn);
+ LOGGER.info("basePath: " + basePath);
+ LOGGER.info("columns: " + Arrays.toString(columns));
+
+ this.baseLocation = basePath;
+ allPartitions = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+ for (int i = 0; i < columns.length; i++) {
+ if (columns[i].equalsIgnoreCase(partitionColumn)) {
+ partionColumnIndex = i;
+ break;
+ }
+ }
+
+ for (int partionCounter = 0; partionCounter < numberOfPartitions; partionCounter++) {
+ PartitionImpl partitionImpl =
+ new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter);
+
+ List<Object> includedHashes = new ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ includedHashes.add(partionCounter);
+
+ allPartitions.add(partitionImpl);
+ }
+ }
+
+ @Override public Partition getPartionForTuple(Object[] tuple, long rowCounter) {
+ int hashCode;
+ if (partionColumnIndex == -1) {
+ hashCode = hashCode(rowCounter);
+ } else {
+ try {
+ hashCode = hashCode(((String) tuple[partionColumnIndex]).hashCode());
+ } catch (NumberFormatException e) {
+ hashCode = hashCode(0);
+ }
+ }
+ return allPartitions.get(hashCode);
+ }
+
+ /**
+ *
+ */
+ public List<Partition> getAllPartitions() {
+ return allPartitions;
+ }
+
+ /**
+ * @see DataPartitioner#getPartitions(CarbonQueryPlan)
+ */
+ public List<Partition> getPartitions(CarbonQueryPlan queryPlan) {
+ // TODO: this has to be redone during partitioning implmentatation
+ return allPartitions;
+ }
+
+ /**
+ * Identify the partitions applicable for the given filter
+ */
+ public List<Partition> getPartitions() {
+ return allPartitions;
+
+ // TODO: this has to be redone during partitioning implementation
+ // for (Partition aPartition : allPartitions) {
+ // CarbonDimensionLevelFilter partitionFilterDetails =
+ // aPartition.getPartitionDetails().get(partitionColumn);
+ //
+ // //Check if the partition is serving any of the
+ // //hash code generated for include filter of query
+ // for (Object includeFilter : msisdnFilter.getIncludeFilter()) {
+ // int hashCode = hashCode(((String) includeFilter).hashCode());
+ // if (partitionFilterDetails.getIncludeFilter().contains(hashCode)) {
+ // allowedPartitions.add(aPartition);
+ // break;
+ // }
+ // }
+ // }
+
+ }
+
+ private int hashCode(long key) {
+ return (int) (Math.abs(key) % numberOfPartitions);
+ }
+
+ @Override public String[] getPartitionedColumns() {
+ return new String[] { partitionColumn };
+ }
+
+ @Override public Partitioner getPartitioner() {
+ return partitioner;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
new file mode 100644
index 0000000..bb8fc5c
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.readsupport;
+
+import java.sql.Timestamp;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
+
+ @Override public void initialize(CarbonColumn[] carbonColumns,
+ AbsoluteTableIdentifier absoluteTableIdentifier) {
+ super.initialize(carbonColumns, absoluteTableIdentifier);
+ //can initialize and generate schema here.
+ }
+
+ @Override public Row readRow(Object[] data) {
+ for (int i = 0; i < dictionaries.length; i++) {
+ if (dictionaries[i] != null) {
+ data[i] = DataTypeUtil
+ .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
+ dataTypes[i]);
+ switch (dataTypes[i]) {
+ case STRING:
+ data[i] = UTF8String.fromString(data[i].toString());
+ break;
+ case TIMESTAMP:
+ data[i] = new Timestamp((long) data[i] / 1000);
+ break;
+ case LONG:
+ data[i] = data[i];
+ break;
+ default:
+ }
+ } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ //convert the long to timestamp in case of direct dictionary column
+ if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
+ data[i] = new Timestamp((long) data[i] / 1000);
+ }
+ }
+ }
+ return new GenericRow(data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
new file mode 100644
index 0000000..3fb24e2
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.carbondata.spark.splits;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * It represents one region server as one split.
+ */
+public class TableSplit implements Serializable, Writable {
+ private static final long serialVersionUID = -8058151330863145575L;
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(TableSplit.class.getName());
+ private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+ private Partition partition;
+
+ /**
+ * @return the locations
+ */
+ public List<String> getLocations() {
+ return locations;
+ }
+
+ /**
+ * @param locations the locations to set
+ */
+ public void setLocations(List<String> locations) {
+ this.locations = locations;
+ }
+
+ /**
+ * @return Returns the partitions.
+ */
+ public Partition getPartition() {
+ return partition;
+ }
+
+ /**
+ * @param partition The partitions to set.
+ */
+ public void setPartition(Partition partition) {
+ this.partition = partition;
+ }
+
+ @Override public void readFields(DataInput in) throws IOException {
+
+ int sizeLoc = in.readInt();
+ for (int i = 0; i < sizeLoc; i++) {
+ byte[] b = new byte[in.readInt()];
+ in.readFully(b);
+ locations.add(new String(b, Charset.defaultCharset()));
+ }
+
+ byte[] buf = new byte[in.readInt()];
+ in.readFully(buf);
+ ByteArrayInputStream bis = new ByteArrayInputStream(buf);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ try {
+ partition = (Partition) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ ois.close();
+ }
+
+ @Override public void write(DataOutput out) throws IOException {
+
+ int sizeLoc = locations.size();
+ out.writeInt(sizeLoc);
+ for (int i = 0; i < sizeLoc; i++) {
+ byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset());
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ ObjectOutputStream obs = new ObjectOutputStream(bos);
+ obs.writeObject(partition);
+ obs.close();
+ byte[] byteArray = bos.toByteArray();
+ out.writeInt(byteArray.length);
+ out.write(byteArray);
+ }
+
+ public String toString() {
+ return partition.getUniqueID() + ' ' + locations;
+ }
+}