You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/15 11:50:44 UTC
[39/42] carbondata git commit: Fixed Synchronization issue and
improve IUD performance
Fixed Synchronization issue and improve IUD performance
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da952e82
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da952e82
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da952e82
Branch: refs/heads/branch-1.1
Commit: da952e82b443839e9c8b7fdeebaed092d3232652
Parents: bbf5dc1
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Jun 12 16:06:24 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 13:32:15 2017 +0530
----------------------------------------------------------------------
.../core/datastore/block/AbstractIndex.java | 41 ++++++++
.../core/datastore/block/TableBlockInfo.java | 22 +++-
.../core/mutate/CarbonUpdateUtil.java | 16 +++
.../core/mutate/DeleteDeltaBlockletDetails.java | 15 +--
.../carbondata/core/mutate/DeleteDeltaVo.java | 60 +++++++++++
.../reader/CarbonDeleteFilesDataReader.java | 47 +++++++++
.../impl/DictionaryBasedResultCollector.java | 11 +-
.../collector/impl/RawBasedResultCollector.java | 7 +-
...structureBasedDictionaryResultCollector.java | 7 +-
.../RestructureBasedRawResultCollector.java | 7 +-
.../executor/impl/AbstractQueryExecutor.java | 9 +-
.../scan/executor/infos/BlockExecutionInfo.java | 56 ++++++----
.../scan/executor/infos/DeleteDeltaInfo.java | 82 +++++++++++++++
.../core/scan/result/AbstractScannedResult.java | 61 +++++++----
.../AbstractDetailQueryResultIterator.java | 103 ++++++++++++++++++-
.../scan/scanner/AbstractBlockletScanner.java | 9 --
.../core/scan/scanner/impl/FilterScanner.java | 10 --
.../SegmentUpdateStatusManager.java | 29 ++++--
.../datastore/SegmentTaskIndexStoreTest.java | 2 +-
.../core/datastore/block/BlockInfoTest.java | 12 +--
.../datastore/block/TableBlockInfoTest.java | 32 +++---
.../core/datastore/block/TableTaskInfoTest.java | 8 +-
.../carbondata/core/util/CarbonUtilTest.java | 4 +-
.../core/util/DataFileFooterConverterTest.java | 8 +-
.../carbondata/hadoop/CarbonInputFormat.java | 11 +-
.../carbondata/hadoop/CarbonInputSplit.java | 39 +++++--
.../internal/index/impl/InMemoryBTreeIndex.java | 5 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../carbon/datastore/BlockIndexStoreTest.java | 28 ++---
31 files changed, 574 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
index b538dc3..4d0e56d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
@@ -17,11 +17,13 @@
package org.apache.carbondata.core.datastore.block;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.datastore.DataRefNode;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
public abstract class AbstractIndex implements Cacheable {
@@ -51,6 +53,16 @@ public abstract class AbstractIndex implements Cacheable {
protected long memorySize;
/**
+ * last fetch delete deltaFile timestamp
+ */
+ private long deleteDeltaTimestamp;
+
+ /**
+ * map of blockletidAndPageId to
+ * deleted rows
+ */
+ private Map<String, DeleteDeltaVo> deletedRowsMap;
+ /**
* @return the segmentProperties
*/
public SegmentProperties getSegmentProperties() {
@@ -124,4 +136,33 @@ public abstract class AbstractIndex implements Cacheable {
public void setMemorySize(long memorySize) {
this.memorySize = memorySize;
}
+
+ /**
+ * @return latest deleted delta timestamp
+ */
+ public long getDeleteDeltaTimestamp() {
+ return deleteDeltaTimestamp;
+ }
+
+ /**
+ * set the latest delete delta timestamp
+ * @param deleteDeltaTimestamp
+ */
+ public void setDeleteDeltaTimestamp(long deleteDeltaTimestamp) {
+ this.deleteDeltaTimestamp = deleteDeltaTimestamp;
+ }
+
+ /**
+ * @return the deleted record for block map
+ */
+ public Map<String, DeleteDeltaVo> getDeletedRowsMap() {
+ return deletedRowsMap;
+ }
+
+ /**
+ * @param deletedRowsMap
+ */
+ public void setDeletedRowsMap(Map<String, DeleteDeltaVo> deletedRowsMap) {
+ this.deletedRowsMap = deletedRowsMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 8fbaa4a..44347cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -72,14 +72,20 @@ public class TableBlockInfo implements Distributable, Serializable {
private Map<String, String> blockStorageIdMap =
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ /**
+ * delete delta files path for this block
+ */
+ private String[] deletedDeltaFilePath;
+
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
- long blockLength, ColumnarFormatVersion version) {
+ long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
this.filePath = FileFactory.getUpdatedFilePath(filePath);
this.blockOffset = blockOffset;
this.segmentId = segmentId;
this.locations = locations;
this.blockLength = blockLength;
this.version = version;
+ this.deletedDeltaFilePath = deletedDeltaFilePath;
}
/**
@@ -93,8 +99,9 @@ public class TableBlockInfo implements Distributable, Serializable {
* @param blockletInfos
*/
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
- long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version) {
- this(filePath, blockOffset, segmentId, locations, blockLength, version);
+ long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
+ String[] deletedDeltaFilePath) {
+ this(filePath, blockOffset, segmentId, locations, blockLength, version, deletedDeltaFilePath);
this.blockletInfos = blockletInfos;
}
@@ -112,8 +119,9 @@ public class TableBlockInfo implements Distributable, Serializable {
*/
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
- Map<String, String> blockStorageIdMap) {
- this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version);
+ Map<String, String> blockStorageIdMap, String[] deletedDeltaFilePath) {
+ this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version,
+ deletedDeltaFilePath);
this.blockStorageIdMap = blockStorageIdMap;
}
@@ -307,4 +315,8 @@ public class TableBlockInfo implements Distributable, Serializable {
public void setBlockStorageIdMap(Map<String, String> blockStorageIdMap) {
this.blockStorageIdMap = blockStorageIdMap;
}
+
+ public String[] getDeletedDeltaFilePath() {
+ return deletedDeltaFilePath;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index fef5905..b5a632f 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -800,4 +800,20 @@ public class CarbonUpdateUtil {
}
+ /**
+ * Below method will be used to get the latest delete delta file timestamp
+ * @param deleteDeltaFiles
+ * @return latest delete delta file time stamp
+ */
+ public static long getLatestDeleteDeltaTimestamp(String[] deleteDeltaFiles) {
+ long latestTimestamp = 0;
+ for (int i = 0; i < deleteDeltaFiles.length; i++) {
+ long convertTimeStampToLong = Long.parseLong(
+ CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(deleteDeltaFiles[i]));
+ if (latestTimestamp < convertTimeStampToLong) {
+ latestTimestamp = convertTimeStampToLong;
+ }
+ }
+ return latestTimestamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
index 7df5f22..0f54f3a 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
@@ -21,9 +21,6 @@ import java.io.Serializable;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
/**
* This class stores the blocklet details of delete delta file
*/
@@ -35,12 +32,6 @@ public class DeleteDeltaBlockletDetails implements Serializable {
private Set<Integer> deletedRows;
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName());
-
public DeleteDeltaBlockletDetails(String id, Integer pageId) {
this.id = id;
deletedRows = new TreeSet<Integer>();
@@ -84,7 +75,11 @@ public class DeleteDeltaBlockletDetails implements Serializable {
}
@Override public int hashCode() {
- return id.hashCode();
+ return id.hashCode() + pageId.hashCode();
+ }
+
+ public String getBlockletKey() {
+ return this.id + '_' + this.pageId;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
new file mode 100644
index 0000000..d68e4e9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.mutate;
+
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Class which keep the information about the rows
+ * while got deleted
+ */
+public class DeleteDeltaVo {
+
+ /**
+ * deleted rows bitset
+ */
+ private BitSet bitSet;
+
+ public DeleteDeltaVo() {
+ bitSet = new BitSet();
+ }
+
+ /**
+ * Below method will be used to insert the rows
+ * which are deleted
+ *
+ * @param data
+ */
+ public void insertData(Set<Integer> data) {
+ Iterator<Integer> iterator = data.iterator();
+ while (iterator.hasNext()) {
+ bitSet.set(iterator.next());
+ }
+ }
+
+ /**
+ * below method will be used to check the row is deleted or not
+ *
+ * @param counter
+ * @return
+ */
+ public boolean containsRow(int counter) {
+ return bitSet.get(counter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index e689566..417ad29 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.util.CarbonProperties;
@@ -120,7 +122,52 @@ public class CarbonDeleteFilesDataReader {
}
}
return pageIdDeleteRowsMap;
+ }
+ /**
+ * Below method will be used to read the delete delta files
+ * and get the map of blockletid and page id mapping to deleted
+ * rows
+ *
+ * @param deltaFiles delete delta files array
+ * @return map of blockletid_pageid to deleted rows
+ */
+ public Map<String, DeleteDeltaVo> getDeletedRowsDataVo(String[] deltaFiles) {
+ List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
+ for (final String deltaFile : deltaFiles) {
+ taskSubmitList.add(executorService.submit(new Callable<DeleteDeltaBlockDetails>() {
+ @Override public DeleteDeltaBlockDetails call() throws IOException {
+ CarbonDeleteDeltaFileReaderImpl deltaFileReader =
+ new CarbonDeleteDeltaFileReaderImpl(deltaFile, FileFactory.getFileType(deltaFile));
+ return deltaFileReader.readJson();
+ }
+ }));
+ }
+ try {
+ executorService.shutdown();
+ executorService.awaitTermination(30, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ LOGGER.error("Error while reading the delete delta files : " + e.getMessage());
+ }
+ Map<String, DeleteDeltaVo> pageIdToBlockLetVo = new HashMap<>();
+ List<DeleteDeltaBlockletDetails> blockletDetails = null;
+ for (int i = 0; i < taskSubmitList.size(); i++) {
+ try {
+ blockletDetails = taskSubmitList.get(i).get().getBlockletDetails();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ for (DeleteDeltaBlockletDetails blockletDetail : blockletDetails) {
+ DeleteDeltaVo deleteDeltaVo = pageIdToBlockLetVo.get(blockletDetail.getBlockletKey());
+ if (null == deleteDeltaVo) {
+ deleteDeltaVo = new DeleteDeltaVo();
+ pageIdToBlockLetVo.put(blockletDetail.getBlockletKey(), deleteDeltaVo);
+ }
+ deleteDeltaVo.insertData(blockletDetail.getDeletedRows());
+ }
+ }
+ return pageIdToBlockLetVo;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index d4d16d0..dba92ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -17,9 +17,11 @@
package org.apache.carbondata.core.scan.collector.impl;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -90,8 +92,6 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
int[] surrogateResult;
String[] noDictionaryKeys;
byte[][] complexTypeKeyArray;
- BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
- scannedResult.getDeleteDeltaDataCache();
while (scannedResult.hasNext() && rowCounter < batchSize) {
Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionExists) {
@@ -108,8 +108,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
} else {
scannedResult.incrementCounter();
}
- if (null != deleteDeltaDataCache && deleteDeltaDataCache
- .contains(scannedResult.getCurrentRowId(), scannedResult.getCurrentPageCounter())) {
+ if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
continue;
}
fillMeasureData(scannedResult, row);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 478dc8c..3e82257 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -20,7 +20,6 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.QueryMeasure;
@@ -54,15 +53,11 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector {
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
- BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
- scannedResult.getDeleteDeltaDataCache();
// scan the record and add to list
int rowCounter = 0;
while (scannedResult.hasNext() && rowCounter < batchSize) {
scanResultAndGetData(scannedResult);
- if (null != deleteDeltaDataCache && deleteDeltaDataCache
- .contains(scannedResult.getCurrentRowId(),
- scannedResult.getCurrentPageCounter())) {
+ if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
continue;
}
prepareRow(scannedResult, listBasedResult, queryMeasures);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 4fa1494..8f89760 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.result.AbstractScannedResult;
@@ -50,8 +49,6 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
int[] surrogateResult;
String[] noDictionaryKeys;
byte[][] complexTypeKeyArray;
- BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
- scannedResult.getDeleteDeltaDataCache();
Map<Integer, GenericQueryType> comlexDimensionInfoMap =
tableBlockExecutionInfos.getComlexDimensionInfoMap();
while (scannedResult.hasNext() && rowCounter < batchSize) {
@@ -80,9 +77,7 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
} else {
scannedResult.incrementCounter();
}
- if (null != deleteDeltaDataCache && deleteDeltaDataCache
- .contains(scannedResult.getCurrentRowId(),
- scannedResult.getCurrentPageCounter())) {
+ if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
continue;
}
fillMeasureData(scannedResult, row);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 2de74fa..479a684 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenException;
@@ -152,15 +151,11 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
- BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
- scannedResult.getDeleteDeltaDataCache();
// scan the record and add to list
int rowCounter = 0;
while (scannedResult.hasNext() && rowCounter < batchSize) {
scanResultAndGetData(scannedResult);
- if (null != deleteDeltaDataCache && deleteDeltaDataCache
- .contains(scannedResult.getCurrentRowId(),
- scannedResult.getCurrentPageCounter())) {
+ if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
continue;
}
// re-fill dictionary and no dictionary key arrays for the newly added columns
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 2a5c342..ba7530d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -193,7 +193,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i),
queryModel.getTableBlockInfos().get(i).getBlockletInfos().getStartBlockletNumber(),
queryModel.getTableBlockInfos().get(i).getBlockletInfos().getNumberOfBlockletToScan(),
- queryModel.getTableBlockInfos().get(i).getFilePath()));
+ queryModel.getTableBlockInfos().get(i).getFilePath(),
+ queryModel.getTableBlockInfos().get(i).getDeletedDeltaFilePath()));
}
if (null != queryModel.getStatisticsRecorder()) {
QueryStatistic queryStatistic = new QueryStatistic();
@@ -214,7 +215,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
* @throws QueryExecutionException any failure during block info creation
*/
protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
- AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath)
+ AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath,
+ String[] deleteDeltaFiles)
throws QueryExecutionException {
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
@@ -232,6 +234,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier()).getFactDir()
.length() + 1;
blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength));
+ blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
blockExecutionInfo.setQueryDimensions(currentBlockQueryDimensions
@@ -360,8 +363,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// setting the no dictionary column block indexes
blockExecutionInfo.setNoDictionaryBlockIndexes(ArrayUtils.toPrimitive(
noDictionaryColumnBlockIndex.toArray(new Integer[noDictionaryColumnBlockIndex.size()])));
- // setting column id to dictionary mapping
- blockExecutionInfo.setColumnIdToDcitionaryMapping(queryProperties.columnToDictionayMapping);
// setting each column value size
blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize());
blockExecutionInfo.setComplexColumnParentBlockIndexes(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index b294b58..7d08dda 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -18,12 +18,12 @@ package org.apache.carbondata.core.scan.executor.infos;
import java.util.Map;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.datastore.DataRefNode;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.model.QueryDimension;
@@ -101,12 +101,6 @@ public class BlockExecutionInfo {
private int[] projectionListMeasureIndexes;
/**
- * this will be used to update the older block fixed length keys with the
- * new block fixed length key
- */
- private KeyStructureInfo keyStructureInfo;
-
- /**
* first block from which query execution will start
*/
private DataRefNode firstDataBlock;
@@ -146,12 +140,6 @@ public class BlockExecutionInfo {
private Map<Integer, KeyStructureInfo> columnGroupToKeyStructureInfo;
/**
- * mapping of dictionary dimension to its dictionary mapping which will be
- * used to get the actual data from dictionary for aggregation, sorting
- */
- private Map<String, Dictionary> columnIdToDcitionaryMapping;
-
- /**
* filter tree to execute the filter
*/
private FilterExecuter filterExecuterTree;
@@ -230,6 +218,13 @@ public class BlockExecutionInfo {
*/
private AbsoluteTableIdentifier absoluteTableIdentifier;
+ /**
+ * delete delta file path
+ */
+ private String[] deleteDeltaFilePath;
+
+ private Map<String, DeleteDeltaVo> deletedRecordsMap;
+
public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
return absoluteTableIdentifier;
}
@@ -484,13 +479,6 @@ public class BlockExecutionInfo {
this.columnGroupToKeyStructureInfo = columnGroupToKeyStructureInfo;
}
- /**
- * @param columnIdToDcitionaryMapping the columnIdToDcitionaryMapping to set
- */
- public void setColumnIdToDcitionaryMapping(Map<String, Dictionary> columnIdToDcitionaryMapping) {
- this.columnIdToDcitionaryMapping = columnIdToDcitionaryMapping;
- }
-
public boolean isRawRecordDetailQuery() {
return isRawRecordDetailQuery;
}
@@ -643,4 +631,32 @@ public class BlockExecutionInfo {
this.projectionListMeasureIndexes = projectionListMeasureIndexes;
}
+ /**
+ * @return delete delta files
+ */
+ public String[] getDeleteDeltaFilePath() {
+ return deleteDeltaFilePath;
+ }
+
+ /**
+ * set the delete delta files
+ * @param deleteDeltaFilePath
+ */
+ public void setDeleteDeltaFilePath(String[] deleteDeltaFilePath) {
+ this.deleteDeltaFilePath = deleteDeltaFilePath;
+ }
+
+ /**
+ * @return deleted record map
+ */
+ public Map<String, DeleteDeltaVo> getDeletedRecordsMap() {
+ return deletedRecordsMap;
+ }
+
+ /**
+ * @param deletedRecordsMap
+ */
+ public void setDeletedRecordsMap(Map<String, DeleteDeltaVo> deletedRecordsMap) {
+ this.deletedRecordsMap = deletedRecordsMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java
new file mode 100644
index 0000000..52fa529
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.executor.infos;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+
+/**
+ * class to hold information about delete delta files
+ */
+public class DeleteDeltaInfo {
+
+ /**
+ * delete delta files
+ */
+ private String[] deleteDeltaFile;
+
+ /**
+ * latest delete delta file timestamp
+ */
+ private long latestDeleteDeltaFileTimestamp;
+
+ public DeleteDeltaInfo(String[] deleteDeltaFile) {
+ this.deleteDeltaFile = deleteDeltaFile;
+ this.latestDeleteDeltaFileTimestamp =
+ CarbonUpdateUtil.getLatestDeleteDeltaTimestamp(deleteDeltaFile);
+ }
+
+ public String[] getDeleteDeltaFile() {
+ return deleteDeltaFile;
+ }
+
+ public long getLatestDeleteDeltaFileTimestamp() {
+ return latestDeleteDeltaFileTimestamp;
+ }
+
+ @Override public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(deleteDeltaFile);
+ result =
+ prime * result + (int) (latestDeleteDeltaFileTimestamp ^ (latestDeleteDeltaFileTimestamp
+ >>> 32));
+ return result;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ DeleteDeltaInfo other = (DeleteDeltaInfo) obj;
+ if (!Arrays.equals(deleteDeltaFile, other.deleteDeltaFile)) {
+ return false;
+ }
+ if (latestDeleteDeltaFileTimestamp != other.latestDeleteDeltaFileTimestamp) {
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 1dda1aa..c24b73c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -25,11 +25,13 @@ import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
+import org.apache.carbondata.core.mutate.TupleIdEnum;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -125,7 +127,20 @@ public abstract class AbstractScannedResult {
*/
private int[] complexParentBlockIndexes;
- protected BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache;
+ /**
+ * blockletid+pageumber to deleted reocrd map
+ */
+ private Map<String, DeleteDeltaVo> deletedRecordMap;
+
+ /**
+ * current page delete delta vo
+ */
+ private DeleteDeltaVo currentDeleteDeltaVo;
+
+ /**
+ * actual blocklet number
+ */
+ private String blockletNumber;
public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) {
this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
@@ -135,6 +150,7 @@ public abstract class AbstractScannedResult {
this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap();
this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes();
this.totalDimensionsSize = blockExecutionInfo.getQueryDimensions().length;
+ this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
}
/**
@@ -393,6 +409,12 @@ public abstract class AbstractScannedResult {
*/
public void setBlockletId(String blockletId) {
this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
+ blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID);
+ // if deleted recors map is present for this block
+ // then get the first page deleted vo
+ if (null != deletedRecordMap) {
+ currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
+ }
}
/**
@@ -457,6 +479,9 @@ public abstract class AbstractScannedResult {
pageCounter++;
rowCounter = 0;
currentRow = -1;
+ if (null != deletedRecordMap) {
+ currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + pageCounter + "");
+ }
return hasNext();
}
return false;
@@ -629,21 +654,6 @@ public abstract class AbstractScannedResult {
public abstract String[] getNoDictionaryKeyStringArray();
/**
- * @return BlockletLevelDeleteDeltaDataCache.
- */
- public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
- return blockletDeleteDeltaCache;
- }
-
- /**
- * @param blockletDeleteDeltaCache
- */
- public void setBlockletDeleteDeltaCache(
- BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) {
- this.blockletDeleteDeltaCache = blockletDeleteDeltaCache;
- }
-
- /**
* Mark the filtered rows in columnar batch. These rows will not be added to vector batches later.
* @param columnarBatch
* @param startRow
@@ -653,11 +663,11 @@ public abstract class AbstractScannedResult {
public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
int vectorOffset) {
int rowsFiltered = 0;
- if (blockletDeleteDeltaCache != null) {
+ if (currentDeleteDeltaVo != null) {
int len = startRow + size;
for (int i = startRow; i < len; i++) {
int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
- if (blockletDeleteDeltaCache.contains(rowId, pageCounter)) {
+ if (currentDeleteDeltaVo.containsRow(rowId)) {
columnarBatch.markFiltered(vectorOffset);
rowsFiltered++;
}
@@ -666,4 +676,17 @@ public abstract class AbstractScannedResult {
}
return rowsFiltered;
}
+
+ /**
+ * Below method will be used to check row got deleted
+ *
+ * @param rowId
+ * @return is present in deleted row
+ */
+ public boolean containsDeletedRow(int rowId) {
+ if (null != currentDeleteDeltaVo) {
+ return currentDeleteDeltaVo.containsRow(rowId);
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index a0823af..92e9594 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -18,6 +18,8 @@ package org.apache.carbondata.core.scan.result.iterator;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.carbondata.common.CarbonIterator;
@@ -27,9 +29,13 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.DataRefNode;
import org.apache.carbondata.core.datastore.DataRefNodeFinder;
import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
+import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.executor.infos.DeleteDeltaInfo;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator;
import org.apache.carbondata.core.scan.processor.impl.DataBlockIteratorImpl;
@@ -53,6 +59,9 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
private static final LogService LOGGER =
LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName());
+ private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap =
+ new ConcurrentHashMap<>();
+
protected ExecutorService execService;
/**
* execution info of the block
@@ -77,7 +86,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
/**
* queryStatisticsModel to store query statistics object
*/
- QueryStatisticsModel queryStatisticsModel;
+ private QueryStatisticsModel queryStatisticsModel;
public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
ExecutorService execService) {
@@ -105,13 +114,24 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
private void intialiseInfos() {
for (BlockExecutionInfo blockInfo : blockExecutionInfos) {
- DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
+ Map<String, DeleteDeltaVo> deletedRowsMap = null;
+ DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize(),
+ blockInfo.getDataBlock().getSegmentProperties().getNumberOfSortColumns(),
+ blockInfo.getDataBlock().getSegmentProperties().getNumberOfNoDictSortColumns());
+ // if delete delta file is present
+ if (null != blockInfo.getDeleteDeltaFilePath() && 0 != blockInfo
+ .getDeleteDeltaFilePath().length) {
+ DeleteDeltaInfo deleteDeltaInfo = new DeleteDeltaInfo(blockInfo.getDeleteDeltaFilePath());
+ // read and get the delete detail block details
+ deletedRowsMap = getDeleteDeltaDetails(blockInfo.getDataBlock(), deleteDeltaInfo);
+ // set the deleted row to block execution info
+ blockInfo.setDeletedRecordsMap(deletedRowsMap);
+ }
DataRefNode startDataBlock = finder
.findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
startDataBlock = startDataBlock.getNextDataRefNode();
}
-
long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
//if number of block is less than 0 then take end block.
if (numberOfBlockToScan <= 0) {
@@ -124,6 +144,83 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
}
}
+ /**
+ * Below method will be used to get the delete delta rows for a block
+ *
+ * @param dataBlock data block
+ * @param deleteDeltaInfo delete delta info
+ * @return blockid+pageid to deleted row mapping
+ */
+ private Map<String, DeleteDeltaVo> getDeleteDeltaDetails(AbstractIndex dataBlock,
+ DeleteDeltaInfo deleteDeltaInfo) {
+ // if datablock deleted delta timestamp is more then the current delete delta files timestamp
+ // then return the current deleted rows
+ if (dataBlock.getDeleteDeltaTimestamp() >= deleteDeltaInfo
+ .getLatestDeleteDeltaFileTimestamp()) {
+ return dataBlock.getDeletedRowsMap();
+ }
+ CarbonDeleteFilesDataReader carbonDeleteDeltaFileReader = null;
+ // get the lock object so in case of concurrent query only one task will read the delete delta
+ // files other tasks will wait
+ Object lockObject = deleteDeltaToLockObjectMap.get(deleteDeltaInfo);
+ // if lock object is null then add a lock object
+ if (null == lockObject) {
+ synchronized (deleteDeltaToLockObjectMap) {
+ // double checking
+ lockObject = deleteDeltaToLockObjectMap.get(deleteDeltaInfo);
+ if (null == lockObject) {
+ lockObject = new Object();
+ deleteDeltaToLockObjectMap.put(deleteDeltaInfo, lockObject);
+ }
+ }
+ }
+ // double checking to check the deleted rows is already present or not
+ if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp()) {
+ // if not then acquire the lock
+ synchronized (lockObject) {
+ // check the timestamp again
+ if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo
+ .getLatestDeleteDeltaFileTimestamp()) {
+ // read the delete delta files
+ carbonDeleteDeltaFileReader = new CarbonDeleteFilesDataReader();
+ Map<String, DeleteDeltaVo> deletedRowsMap = carbonDeleteDeltaFileReader
+ .getDeletedRowsDataVo(deleteDeltaInfo.getDeleteDeltaFile());
+ setDeltedDeltaBoToDataBlock(deleteDeltaInfo, deletedRowsMap, dataBlock);
+ // remove the lock
+ deleteDeltaToLockObjectMap.remove(deleteDeltaInfo);
+ return deletedRowsMap;
+ } else {
+ return dataBlock.getDeletedRowsMap();
+ }
+ }
+ } else {
+ return dataBlock.getDeletedRowsMap();
+ }
+ }
+
+ /**
+ * Below method will be used to set deleted records map to data block
+ * based on latest delta file timestamp
+ *
+ * @param deleteDeltaInfo
+ * @param deletedRecordsMap
+ * @param dataBlock
+ */
+ private void setDeltedDeltaBoToDataBlock(DeleteDeltaInfo deleteDeltaInfo,
+ Map<String, DeleteDeltaVo> deletedRecordsMap, AbstractIndex dataBlock) {
+ // check if timestamp of data block is less than the latest delete delta timestamp
+ // then update the delete delta details and timestamp in data block
+ if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp()) {
+ synchronized (dataBlock) {
+ if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo
+ .getLatestDeleteDeltaFileTimestamp()) {
+ dataBlock.setDeletedRowsMap(deletedRecordsMap);
+ dataBlock.setDeleteDeltaTimestamp(deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp());
+ }
+ }
+ }
+ }
+
@Override public boolean hasNext() {
if ((dataBlockIterator != null && dataBlockIterator.hasNext())) {
return true;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index 0fb9782..f3d1336 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -23,8 +23,6 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
-import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -114,13 +112,6 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
}
}
scannedResult.setNumberOfRows(numberOfRows);
- // loading delete data cache in blockexecutioninfo instance
- DeleteDeltaCacheLoaderIntf deleteCacheLoader =
- new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
- blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier());
- deleteCacheLoader.loadDeleteDeltaFileDataToCache();
- scannedResult
- .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
// adding statistics for carbon scan time
QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index 8f14b85..e710e40 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -26,8 +26,6 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
-import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
@@ -198,17 +196,9 @@ public class FilterScanner extends AbstractBlockletScanner {
indexesGroup[k] = indexes;
}
}
- // loading delete data cache in blockexecutioninfo instance
- DeleteDeltaCacheLoaderIntf deleteCacheLoader =
- new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
- blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier());
- deleteCacheLoader.loadDeleteDeltaFileDataToCache();
- scannedResult
- .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
FileHolder fileReader = blocksChunkHolder.getFileReader();
int[][] allSelectedDimensionBlocksIndexes =
blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
-
long dimensionReadTime = System.currentTimeMillis();
DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
.getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 6fab563..5e6e8de 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -261,7 +261,22 @@ public class SegmentUpdateStatusManager {
return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId);
}
-
+ /**
+ * Below method will be used to get all the delete delta files based on block name
+ *
+ * @param blockFilePath actual block filePath
+ * @return all delete delta files
+ * @throws Exception
+ */
+ public String[] getDeleteDeltaFilePath(String blockFilePath) throws Exception {
+ int tableFactPathLength = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier()).getFactDir().length() + 1;
+ String blockame = blockFilePath.substring(tableFactPathLength);
+ String tupleId = CarbonTablePath.getShortBlockId(blockame);
+ return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ .toArray(new String[0]);
+ }
/**
* Returns all delta file paths of specified block
@@ -291,11 +306,8 @@ public class SegmentUpdateStatusManager {
//blockName without timestamp
final String blockNameFromTuple =
blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
- SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
- readLoadMetadata();
- return getDeltaFiles(file, blockNameFromTuple, listOfSegmentUpdateDetailsArray, extension,
+ return getDeltaFiles(file, blockNameFromTuple, extension,
segment);
-
} catch (Exception ex) {
String errorMsg = "Invalid tuple id " + tupleId;
LOG.error(errorMsg);
@@ -345,12 +357,11 @@ public class SegmentUpdateStatusManager {
* @param extension
* @return
*/
- public List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple,
- SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray,
+ private List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple,
final String extension,
String segment) {
- List<String> deleteFileList = null;
- for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
+ List<String> deleteFileList = new ArrayList<>();
+ for (SegmentUpdateDetails block : updateDetails) {
if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName()
.equalsIgnoreCase(segment) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
index c66398c..982fb50 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
@@ -62,7 +62,7 @@ public class SegmentTaskIndexStoreTest {
<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
createCache(CacheType.DRIVER_BTREE, "");
tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L,
- ColumnarFormatVersion.valueOf(version));
+ ColumnarFormatVersion.valueOf(version), null);
absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp",
new CarbonTableIdentifier("testdatabase", "testtable", "TB100"));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
index 08c22ec..1b7f106 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
@@ -27,7 +27,7 @@ public class BlockInfoTest {
static BlockInfo blockInfo;
@BeforeClass public static void setup() {
- blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
+ blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null));
}
@Test public void hashCodeTest() {
@@ -43,7 +43,7 @@ public class BlockInfoTest {
@Test public void equalsTestWithSimilarObject() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
+ new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null));
Boolean res = blockInfo.equals(blockInfoTest);
assert (res);
}
@@ -60,28 +60,28 @@ public class BlockInfoTest {
@Test public void equalsTestWithDifferentSegmentId() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1));
+ new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1, null));
Boolean res = blockInfo.equals(blockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDifferentOffset() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1));
+ new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1, null));
Boolean res = blockInfo.equals(blockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDifferentBlockLength() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
+ new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1, null));
Boolean res = blockInfo.equals(blockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffFilePath() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
+ new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1, null));
Boolean res = blockInfoTest.equals(blockInfo);
assert (!res);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
index 840287e..f4553a6 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
@@ -33,8 +33,8 @@ public class TableBlockInfoTest {
static TableBlockInfo tableBlockInfos;
@BeforeClass public static void setup() {
- tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
- tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
+ tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1, null);
+ tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1, null);
}
@Test public void equalTestWithSameObject() {
@@ -43,7 +43,7 @@ public class TableBlockInfoTest {
}
@Test public void equalTestWithSimilarObject() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1, null);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (res);
}
@@ -59,52 +59,52 @@ public class TableBlockInfoTest {
}
@Test public void equlsTestWithDiffSegmentId() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1, null);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equlsTestWithDiffBlockOffset() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffBlockLength() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1, null);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffBlockletNumber() {
TableBlockInfo tableBlockInfoTest =
- new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffFilePath() {
TableBlockInfo tableBlockInfoTest =
- new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void compareToTestForSegmentId() {
TableBlockInfo tableBlockInfo =
- new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
int res = tableBlockInfos.compareTo(tableBlockInfo);
int expectedResult = 2;
assertEquals(res, expectedResult);
TableBlockInfo tableBlockInfo1 =
- new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
int expectedResult1 = -1;
assertEquals(res1, expectedResult1);
TableBlockInfo tableBlockInfo2 =
- new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
int res2 = tableBlockInfos.compareTo(tableBlockInfo2);
int expectedresult2 = 1;
assertEquals(res2, expectedresult2);
@@ -129,18 +129,18 @@ public class TableBlockInfoTest {
};
- TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1);
+ TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1, null);
int res = tableBlockInfos.compareTo(tableBlockInfo);
int expectedResult = 7;
assertEquals(res, expectedResult);
- TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1);
+ TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1, null);
int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
int expectedResult1 = 1;
assertEquals(res1, expectedResult1);
TableBlockInfo tableBlockInfoTest =
- new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1, null);
int res2 = tableBlockInfos.compareTo(tableBlockInfoTest);
int expectedResult2 = -1;
assertEquals(res2, expectedResult2);
@@ -148,13 +148,13 @@ public class TableBlockInfoTest {
@Test public void compareToTestWithStartBlockletNo() {
TableBlockInfo tableBlockInfo =
- new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
int res = tableBlockInfos.compareTo(tableBlockInfo);
int expectedresult =-1;
assertEquals(res, expectedresult);
TableBlockInfo tableBlockInfo1 =
- new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1);
+ new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1, null);
int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
int expectedresult1 = 1;
assertEquals(res1, expectedresult1);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
index 52c56d3..ccc7af6 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
@@ -33,10 +33,10 @@ public class TableTaskInfoTest {
tableBlockInfoList = new ArrayList<>(5);
String[] locations = { "loc1", "loc2", "loc3" };
- tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
+ tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1, null));
String[] locs = { "loc4", "loc5" };
- tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1));
+ tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1, null));
tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList);
}
@@ -67,10 +67,10 @@ public class TableTaskInfoTest {
List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>();
String[] locations = { "loc1", "loc2", "loc3" };
- tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
+ tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1, null));
String[] locations1 = { "loc1", "loc2", "loc3" };
- tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1));
+ tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1, null));
List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest);
assert (res.equals(locs));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 9adf4d4..badf63e 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -516,7 +516,7 @@ public class CarbonUtilTest {
}
};
TableBlockInfo info =
- new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
+ new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null);
assertEquals(CarbonUtil.readMetadatFile(info).getVersionId().number(), 1);
}
@@ -525,7 +525,7 @@ public class CarbonUtilTest {
public void testToReadMetadatFileWithException()
throws Exception {
TableBlockInfo info =
- new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
+ new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null);
CarbonUtil.readMetadatFile(info);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index 83c7fa4..8161fae 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -142,12 +142,14 @@ public class DataFileFooterConverterTest {
}
};
String[] arr = { "a", "b", "c" };
- TableBlockInfo tableBlockInfo = new TableBlockInfo("/file.carbondata", 3, "id", arr, 3, ColumnarFormatVersion.V1);
+ String fileName = "/part-0-0_batchno0-0-1495074251740.carbondata";
+ TableBlockInfo tableBlockInfo = new TableBlockInfo(fileName, 3, "id", arr, 3, ColumnarFormatVersion.V1, null);
tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3);
List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
tableBlockInfoList.add(tableBlockInfo);
+ String idxFileName = "0_batchno0-0-1495074251740.carbonindex";
List<DataFileFooter> dataFileFooterList =
- dataFileFooterConverter.getIndexInfo("indexfile", tableBlockInfoList);
+ dataFileFooterConverter.getIndexInfo(idxFileName, tableBlockInfoList);
byte[] exp = dataFileFooterList.get(0).getBlockletIndex().getBtreeIndex().getStartKey();
byte[] res = "1".getBytes();
for (int i = 0; i < exp.length; i++) {
@@ -244,7 +246,7 @@ public class DataFileFooterConverterTest {
segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
dataFileFooter.setNumberOfRows(3);
dataFileFooter.setSegmentInfo(segmentInfo);
- TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
+ TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null);
DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info);
assertEquals(result.getNumberOfRows(), 3);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index cda34e4..5d9bbe7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -323,10 +323,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
updateStatusManager)) {
continue;
}
+ String[] deleteDeltaFilePath = null;
+ try {
+ deleteDeltaFilePath =
+ updateStatusManager.getDeleteDeltaFilePath(tableBlockInfo.getFilePath());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
- tableBlockInfo.getVersion()));
+ tableBlockInfo.getVersion(), deleteDeltaFilePath));
}
}
return result;
@@ -429,7 +436,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
- carbonInputSplit.getBlockStorageIdMap()));
+ carbonInputSplit.getBlockStorageIdMap(), carbonInputSplit.getDeleteDeltaFiles()));
}
}
return tableBlockInfoList;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 08661a2..631bc2c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -72,6 +72,11 @@ public class CarbonInputSplit extends FileSplit
private List<UpdateVO> invalidTimestampsList;
+ /**
+ * list of delete delta files for split
+ */
+ private String[] deleteDeltaFiles;
+
public CarbonInputSplit() {
segmentId = null;
taskId = "0";
@@ -82,7 +87,7 @@ public class CarbonInputSplit extends FileSplit
}
private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
- ColumnarFormatVersion version) {
+ ColumnarFormatVersion version, String[] deleteDeltaFiles) {
super(path, start, length, locations);
this.segmentId = segmentId;
String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -93,11 +98,12 @@ public class CarbonInputSplit extends FileSplit
this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
this.invalidSegments = new ArrayList<>();
this.version = version;
+ this.deleteDeltaFiles = deleteDeltaFiles;
}
public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
- int numberOfBlocklets, ColumnarFormatVersion version) {
- this(segmentId, path, start, length, locations, version);
+ int numberOfBlocklets, ColumnarFormatVersion version, String[] deleteDeltaFiles) {
+ this(segmentId, path, start, length, locations, version, deleteDeltaFiles);
this.numberOfBlocklets = numberOfBlocklets;
}
@@ -113,8 +119,9 @@ public class CarbonInputSplit extends FileSplit
* @param blockStorageIdMap
*/
public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
- int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap) {
- this(segmentId, path, start, length, locations, numberOfBlocklets, version);
+ int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap,
+ String[] deleteDeltaFiles) {
+ this(segmentId, path, start, length, locations, numberOfBlocklets, version, deleteDeltaFiles);
this.blockStorageIdMap = blockStorageIdMap;
}
@@ -122,7 +129,7 @@ public class CarbonInputSplit extends FileSplit
ColumnarFormatVersion version)
throws IOException {
return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
- split.getLocations(), version);
+ split.getLocations(), version, null);
}
public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
@@ -133,7 +140,8 @@ public class CarbonInputSplit extends FileSplit
try {
tableBlockInfoList.add(
new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
- split.getLocations(), split.getLength(), blockletInfos, split.getVersion()));
+ split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
+ split.getDeleteDeltaFiles()));
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + split, e);
}
@@ -147,7 +155,7 @@ public class CarbonInputSplit extends FileSplit
try {
return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
- blockletInfos, inputSplit.getVersion());
+ blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + inputSplit, e);
}
@@ -167,6 +175,11 @@ public class CarbonInputSplit extends FileSplit
for (int i = 0; i < numInvalidSegment; i++) {
invalidSegments.add(in.readUTF());
}
+ int numberOfDeleteDeltaFiles = in.readInt();
+ deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+ for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+ deleteDeltaFiles[i] = in.readUTF();
+ }
}
@Override public void write(DataOutput out) throws IOException {
@@ -178,6 +191,12 @@ public class CarbonInputSplit extends FileSplit
for (String invalidSegment : invalidSegments) {
out.writeUTF(invalidSegment);
}
+ out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
+ if (null != deleteDeltaFiles) {
+ for (int i = 0; i < deleteDeltaFiles.length; i++) {
+ out.writeUTF(deleteDeltaFiles[i]);
+ }
+ }
}
public List<String> getInvalidSegments() {
@@ -287,4 +306,8 @@ public class CarbonInputSplit extends FileSplit
public Map<String, String> getBlockStorageIdMap() {
return blockStorageIdMap;
}
+
+ public String[] getDeleteDeltaFiles() {
+ return deleteDeltaFiles;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index 7ba6133..f9dc178 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -90,7 +90,7 @@ class InMemoryBTreeIndex implements Index {
result.add(new CarbonInputSplit(segment.getId(), new Path(tableBlockInfo.getFilePath()),
tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
- tableBlockInfo.getVersion()));
+ tableBlockInfo.getVersion(), null));
}
return result;
}
@@ -142,7 +142,8 @@ class InMemoryBTreeIndex implements Index {
tableBlockInfoList.add(
new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
segment.getId(), carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
- blockletInfos, carbonInputSplit.getVersion()));
+ blockletInfos, carbonInputSplit.getVersion(),
+ carbonInputSplit.getDeleteDeltaFiles()));
}
return tableBlockInfoList;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4ebbf60..2898870 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -300,7 +300,8 @@ class CarbonMergerRDD[K, V](
carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
val blockInfo = new TableBlockInfo(entry.getPath.toString,
entry.getStart, entry.getSegmentId,
- entry.getLocations, entry.getLength, entry.getVersion
+ entry.getLocations, entry.getLength, entry.getVersion,
+ updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
)
!CarbonUtil
.isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3d2e35b..dfea7d7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -564,7 +564,7 @@ object CarbonDataRDDFactory {
val fileSplit = inputSplit.asInstanceOf[FileSplit]
new TableBlockInfo(fileSplit.getPath.toString,
fileSplit.getStart, "1",
- fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
+ fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
).asInstanceOf[Distributable]
}
// group blocks to nodes, tasks
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cab78fe..96a8062 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -577,7 +577,7 @@ object CarbonDataRDDFactory {
val fileSplit = inputSplit.asInstanceOf[FileSplit]
new TableBlockInfo(fileSplit.getPath.toString,
fileSplit.getStart, "1",
- fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
+ fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
).asInstanceOf[Distributable]
}
// group blocks to nodes, tasks