You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/09/16 17:05:38 UTC
[carbondata] branch master updated: [CARBONDATA-3454] optimized
index server output for count(*)
This is an automated email from the ASF dual-hosted git repository.
kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 41ac71a [CARBONDATA-3454] optimized index server output for count(*)
41ac71a is described below
commit 41ac71a7ef96a6725ee9b6a8f26bf4836bd535f9
Author: kunal642 <ku...@gmail.com>
AuthorDate: Thu Jun 27 14:32:11 2019 +0530
[CARBONDATA-3454] optimized index server output for count(*)
Optimised the output for count(*) queries so that only a long is send back to the driver to reduce the network transfer cost for index server
This closes #3308
---
.../apache/carbondata/core/datamap/DataMapJob.java | 2 +
.../carbondata/core/datamap/DataMapUtil.java | 13 ++-
.../core/datamap/DistributableDataMapFormat.java | 34 +++++--
.../core/indexstore/ExtendedBlocklet.java | 68 ++++++++-----
.../core/indexstore/ExtendedBlockletWrapper.java | 27 +++--
.../ExtendedBlockletWrapperContainer.java | 19 ++--
.../carbondata/hadoop/api/CarbonInputFormat.java | 52 ++++++++--
.../hadoop/api/CarbonTableInputFormat.java | 22 ++--
.../carbondata/indexserver/DataMapJobs.scala | 15 ++-
.../indexserver/DistributedCountRDD.scala | 111 +++++++++++++++++++++
.../indexserver/DistributedPruneRDD.scala | 29 ++----
.../indexserver/DistributedRDDUtils.scala | 13 +++
.../carbondata/indexserver/IndexServer.scala | 19 ++++
13 files changed, 319 insertions(+), 105 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
index 9eafe7c..326282d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
@@ -35,4 +35,6 @@ public interface DataMapJob extends Serializable {
List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);
+ Long executeCountJob(DistributableDataMapFormat dataMapFormat);
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index dd9debc..bca7409 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -230,7 +230,7 @@ public class DataMapUtil {
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
List<String> segmentsToBeRefreshed) throws IOException {
return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
- invalidSegments, level, false, segmentsToBeRefreshed);
+ invalidSegments, level, false, segmentsToBeRefreshed, false);
}
/**
@@ -241,7 +241,8 @@ public class DataMapUtil {
public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
- Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException {
+ Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob)
+ throws IOException {
List<String> invalidSegmentNo = new ArrayList<>();
for (Segment segment : invalidSegments) {
invalidSegmentNo.add(segment.getSegmentNo());
@@ -250,9 +251,11 @@ public class DataMapUtil {
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
partitionsToPrune, false, level, isFallbackJob);
- List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat);
- // Apply expression on the blocklets.
- return prunedBlocklets;
+ if (isCountJob) {
+ dataMapFormat.setCountStarJob();
+ dataMapFormat.setIsWriteToFile(false);
+ }
+ return dataMapJob.execute(dataMapFormat);
}
public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 8426fcb..b430c5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -28,7 +28,6 @@ import java.util.UUID;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -91,6 +90,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
private boolean isWriteToFile = true;
+ private boolean isCountStarJob = false;
+
DistributableDataMapFormat() {
}
@@ -103,7 +104,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
this.dataMapToClear = dataMapToClear;
}
- DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
+ public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions,
boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob)
throws IOException {
@@ -136,7 +137,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
return new RecordReader<Void, ExtendedBlocklet>() {
private Iterator<ExtendedBlocklet> blockletIterator;
private ExtendedBlocklet currBlocklet;
- private List<DataMap> dataMaps;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -149,7 +149,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
if (dataMapLevel == null) {
TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
.getDataMap(table, distributable.getDistributable().getDataMapSchema());
- dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable());
blocklets = defaultDataMap
.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions);
blocklets = DataMapUtil
@@ -192,11 +191,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
@Override
public void close() throws IOException {
- if (null != dataMaps) {
- for (DataMap dataMap : dataMaps) {
- dataMap.finish();
- }
- }
}
};
}
@@ -247,6 +241,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
out.writeUTF(taskGroupDesc);
out.writeUTF(queryId);
out.writeBoolean(isWriteToFile);
+ out.writeBoolean(isCountStarJob);
}
@Override
@@ -292,6 +287,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
this.taskGroupDesc = in.readUTF();
this.queryId = in.readUTF();
this.isWriteToFile = in.readBoolean();
+ this.isCountStarJob = in.readBoolean();
}
private void initReadCommittedScope() throws IOException {
@@ -398,9 +394,29 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
return validSegments;
}
+ public List<Segment> getValidSegments() {
+ return validSegments;
+ }
+
public void createDataMapChooser() throws IOException {
if (null != filterResolverIntf) {
this.dataMapChooser = new DataMapChooser(table);
}
}
+
+ public void setCountStarJob() {
+ this.isCountStarJob = true;
+ }
+
+ public boolean isCountStarJob() {
+ return this.isCountStarJob;
+ }
+
+ public List<PartitionSpec> getPartitions() {
+ return partitions;
+ }
+
+ public ReadCommittedScope getReadCommittedScope() {
+ return readCommittedScope;
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index a85423b..611e969 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet {
private CarbonInputSplit inputSplit;
+ private Long count;
+
+ private String segmentNo;
+
public ExtendedBlocklet() {
}
@@ -78,6 +82,9 @@ public class ExtendedBlocklet extends Blocklet {
}
public String getSegmentId() {
+ if (segmentNo != null) {
+ return segmentNo;
+ }
return this.inputSplit.getSegmentId();
}
@@ -92,8 +99,12 @@ public class ExtendedBlocklet extends Blocklet {
return getFilePath();
}
- public String getDataMapWriterPath() {
- return this.inputSplit.getDataMapWritePath();
+ public Long getRowCount() {
+ if (count != null) {
+ return count;
+ } else {
+ return (long) inputSplit.getRowCount();
+ }
}
public void setDataMapWriterPath(String dataMapWriterPath) {
@@ -161,30 +172,35 @@ public class ExtendedBlocklet extends Blocklet {
* @param uniqueLocation
* @throws IOException
*/
- public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
+ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob)
throws IOException {
super.write(out);
- if (dataMapUniqueId == null) {
- out.writeBoolean(false);
+ if (isCountJob) {
+ out.writeLong(inputSplit.getRowCount());
+ out.writeUTF(inputSplit.getSegmentId());
} else {
- out.writeBoolean(true);
- out.writeUTF(dataMapUniqueId);
- }
- out.writeBoolean(inputSplit != null);
- if (inputSplit != null) {
- // creating byte array output stream to get the size of input split serializeData size
- ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(ebos);
- inputSplit.setFilePath(null);
- inputSplit.setBucketId(null);
- if (inputSplit.isBlockCache()) {
- inputSplit.updateFooteroffset();
- inputSplit.updateBlockLength();
- inputSplit.setWriteDetailInfo(false);
+ if (dataMapUniqueId == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeUTF(dataMapUniqueId);
+ }
+ out.writeBoolean(inputSplit != null);
+ if (inputSplit != null) {
+ // creating byte array output stream to get the size of input split serializeData size
+ ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(ebos);
+ inputSplit.setFilePath(null);
+ inputSplit.setBucketId(null);
+ if (inputSplit.isBlockCache()) {
+ inputSplit.updateFooteroffset();
+ inputSplit.updateBlockLength();
+ inputSplit.setWriteDetailInfo(false);
+ }
+ inputSplit.serializeFields(dos, uniqueLocation);
+ out.writeInt(ebos.size());
+ out.write(ebos.getBuffer(), 0, ebos.size());
}
- inputSplit.serializeFields(dos, uniqueLocation);
- out.writeInt(ebos.size());
- out.write(ebos.getBuffer(), 0 , ebos.size());
}
}
@@ -195,9 +211,15 @@ public class ExtendedBlocklet extends Blocklet {
* @param tablePath
* @throws IOException
*/
- public void deserializeFields(DataInput in, String[] locations, String tablePath)
+ public void deserializeFields(DataInput in, String[] locations, String tablePath,
+ boolean isCountJob)
throws IOException {
super.readFields(in);
+ if (isCountJob) {
+ count = in.readLong();
+ segmentNo = in.readUTF();
+ return;
+ }
if (in.readBoolean()) {
dataMapUniqueId = in.readUTF();
}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
index ab051ea..f722f32 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -56,25 +56,20 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
private static final Logger LOGGER =
LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());
-
+ private static final int BUFFER_SIZE = 8 * 1024 * 1024;
+ private static final int BLOCK_SIZE = 256 * 1024 * 1024;
private boolean isWrittenToFile;
-
private int dataSize;
-
private byte[] bytes;
- private static final int BUFFER_SIZE = 8 * 1024 * 1024;
-
- private static final int BLOCK_SIZE = 256 * 1024 * 1024;
-
public ExtendedBlockletWrapper() {
}
public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
- String queryId, boolean isWriteToFile) {
+ String queryId, boolean isWriteToFile, boolean isCountJob) {
Map<String, Short> uniqueLocations = new HashMap<>();
- byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList);
+ byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob);
int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024;
@@ -122,13 +117,13 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
}
private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
- List<ExtendedBlocklet> extendedBlockletList) {
+ List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) {
ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
DataOutputStream stream = new DataOutputStream(bos);
try {
for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
- extendedBlocklet.serializeData(stream, uniqueLocations);
+ extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
}
return new SnappyCompressor().compressByte(bos.toByteArray());
} catch (IOException e) {
@@ -142,6 +137,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
* Below method will be used to write the data to stream[file/memory]
* Data Format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
+ *
* @param stream
* @param data
* @param uniqueLocation
@@ -158,7 +154,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
final Map.Entry<String, Short> next = iterator.next();
uniqueLoc[next.getValue()] = next.getKey();
}
- stream.writeShort((short)uniqueLoc.length);
+ stream.writeShort((short) uniqueLoc.length);
for (String loc : uniqueLoc) {
stream.writeUTF(loc);
}
@@ -170,12 +166,14 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
* deseralize the blocklet data from file or stream
* data format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
+ *
* @param tablePath
* @param queryId
* @return
* @throws IOException
*/
- public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) throws IOException {
+ public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob)
+ throws IOException {
byte[] data;
if (bytes != null) {
if (isWrittenToFile) {
@@ -218,7 +216,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
try {
for (int i = 0; i < numberOfBlocklet; i++) {
ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
- extendedBlocklet.deserializeFields(eDIS, locations, tablePath);
+ extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob);
extendedBlockletList.add(extendedBlocklet);
}
} finally {
@@ -248,4 +246,5 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
}
this.dataSize = in.readInt();
}
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
index 0c52297..40acf9e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
@@ -62,8 +62,8 @@ public class ExtendedBlockletWrapperContainer implements Writable {
this.isFallbackJob = isFallbackJob;
}
- public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId)
- throws IOException {
+ public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId,
+ boolean isCountJob) throws IOException {
if (!isFallbackJob) {
int numOfThreads = CarbonProperties.getNumOfThreadsForPruning();
ExecutorService executorService = Executors
@@ -85,8 +85,8 @@ public class ExtendedBlockletWrapperContainer implements Writable {
List<Future<List<ExtendedBlocklet>>> futures = new ArrayList<>();
for (int i = 0; i < split.length; i++) {
end += split[i];
- futures.add(executorService
- .submit(new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId)));
+ futures.add(executorService.submit(
+ new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId, isCountJob)));
start += split[i];
}
executorService.shutdown();
@@ -109,7 +109,8 @@ public class ExtendedBlockletWrapperContainer implements Writable {
} else {
List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) {
- extendedBlocklets.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId));
+ extendedBlocklets
+ .addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId, isCountJob));
}
return extendedBlocklets;
}
@@ -125,18 +126,22 @@ public class ExtendedBlockletWrapperContainer implements Writable {
private String queryId;
+ private boolean isCountJob;
+
public ExtendedBlockletDeserializerThread(int start, int end, String tablePath,
- String queryId) {
+ String queryId, boolean isCountJob) {
this.start = start;
this.end = end;
this.tablePath = tablePath;
this.queryId = queryId;
+ this.isCountJob = isCountJob;
}
@Override public List<ExtendedBlocklet> call() throws Exception {
List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
for (int i = start; i < end; i++) {
- extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId));
+ extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId,
+ isCountJob));
}
return extendedBlocklets;
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 45041e4..56ccabc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.DataMapUtil;
+import org.apache.carbondata.core.datamap.DistributableDataMapFormat;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
@@ -412,10 +413,42 @@ m filterExpression
*/
@Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
- List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
+ /**
+ * This method will execute a distributed job(DistributedDataMapJob) to get the count for the
+ * table. If the DistributedDataMapJob fails for some reason then an embedded job is fired to
+ * get the count.
+ */
+ Long getDistributedCount(CarbonTable table,
+ List<PartitionSpec> partitionNames, List<Segment> validSegments) throws IOException {
+ DistributableDataMapFormat dataMapFormat =
+ new DistributableDataMapFormat(table, null, validSegments, new ArrayList<String>(),
+ partitionNames, false, null, false);
+ dataMapFormat.setIsWriteToFile(false);
+ try {
+ DataMapJob dataMapJob =
+ (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
+ if (dataMapJob == null) {
+ throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob");
+ }
+ return dataMapJob.executeCountJob(dataMapFormat);
+ } catch (Exception e) {
+ LOG.error("Failed to get count from index server. Initializing fallback", e);
+ DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob();
+ return dataMapJob.executeCountJob(dataMapFormat);
+ }
+ }
+
+ List<ExtendedBlocklet> getDistributedBlockRowCount(CarbonTable table,
+ List<PartitionSpec> partitionNames, List<Segment> validSegments,
+ List<Segment> invalidSegments, List<String> segmentsToBeRefreshed) throws IOException {
+ return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments,
+ segmentsToBeRefreshed, true);
+ }
+
+ private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
List<Segment> validSegments, List<Segment> invalidSegments,
- List<String> segmentsToBeRefreshed) throws IOException {
+ List<String> segmentsToBeRefreshed, boolean isCountJob) throws IOException {
try {
DataMapJob dataMapJob =
(DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
@@ -424,7 +457,7 @@ m filterExpression
}
return DataMapUtil
.executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments,
- invalidSegments, null, segmentsToBeRefreshed);
+ invalidSegments, null, false, segmentsToBeRefreshed, isCountJob);
} catch (Exception e) {
// Check if fallback is disabled for testing purposes then directly throw exception.
if (CarbonProperties.getInstance().isFallBackDisabled()) {
@@ -432,10 +465,9 @@ m filterExpression
}
LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
+ "back to embedded mode", e);
- return DataMapUtil
- .executeDataMapJob(table, filterResolverIntf,
- DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, invalidSegments, null,
- true, segmentsToBeRefreshed);
+ return DataMapUtil.executeDataMapJob(table, filterResolverIntf,
+ DataMapUtil.getEmbeddedJob(), partitionNames, validSegments,
+ invalidSegments, null, true, segmentsToBeRefreshed, isCountJob);
}
}
@@ -545,7 +577,7 @@ m filterExpression
try {
prunedBlocklets =
getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
- invalidSegments, segmentsToBeRefreshed);
+ invalidSegments, segmentsToBeRefreshed, false);
} catch (Exception e) {
// Check if fallback is disabled then directly throw exception otherwise try driver
// pruning.
@@ -580,7 +612,7 @@ m filterExpression
if (distributedCG && dataMapJob != null) {
cgPrunedBlocklets = DataMapUtil
.executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
- segmentIds, invalidSegments, DataMapLevel.CG, true, new ArrayList<String>());
+ segmentIds, invalidSegments, DataMapLevel.CG, new ArrayList<String>());
} else {
cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
}
@@ -616,7 +648,7 @@ m filterExpression
// Prune segments from already pruned blocklets
fgPrunedBlocklets = DataMapUtil
.executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
- segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(), true,
+ segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(),
new ArrayList<String>());
// note that the 'fgPrunedBlocklets' has extra datamap related info compared with
// 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 3b7a800..74a4d6e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -562,15 +563,14 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
try {
- List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
- getDistributedSplit(table, null, partitions, filteredSegment,
- allSegments.getInvalidSegments(), toBeCleanedSegments));
- for (InputSplit extendedBlocklet : extendedBlocklets) {
- CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
+ List<ExtendedBlocklet> extendedBlocklets =
+ getDistributedBlockRowCount(table, partitions, filteredSegment,
+ allSegments.getInvalidSegments(), toBeCleanedSegments);
+ for (ExtendedBlocklet blocklet : extendedBlocklets) {
String filePath = blocklet.getFilePath().replace("\\", "/");
String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
- (long) blocklet.getRowCount());
+ blocklet.getRowCount());
}
} catch (Exception e) {
// Check if fallback is disabled then directly throw exception otherwise try driver
@@ -615,15 +615,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
}
}
} else {
- long totalRowCount = 0L;
+ long totalRowCount;
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
- List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
- getDistributedSplit(table, null, partitions, filteredSegment,
- allSegments.getInvalidSegments(), new ArrayList<String>()));
- for (InputSplit extendedBlocklet : extendedBlocklets) {
- totalRowCount += ((CarbonInputSplit) extendedBlocklet).getRowCount();
- }
+ totalRowCount =
+ getDistributedCount(table, partitions, filteredSegment);
} else {
TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap);
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 01b8824..1fee051 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -65,7 +65,8 @@ class DistributedDataMapJob extends AbstractDataMapJob {
dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
dataMapFormat.setFilterResolverIntf(filterInf)
IndexServer.getClient.getSplits(dataMapFormat)
- .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId)
+ .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat
+ .getQueryId, dataMapFormat.isCountStarJob)
} finally {
val tmpPath = CarbonUtil
.getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath,
@@ -106,7 +107,11 @@ class DistributedDataMapJob extends AbstractDataMapJob {
filterInf.getFilterExpression.getFilterExpressionType == ExpressionType.UNKNOWN) {
return filterProcessor.changeUnknownResloverToTrue(tableIdentifer)
}
- return filterInf;
+ filterInf
+ }
+
+ override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
+ IndexServer.getClient.getCount(dataMapFormat).get()
}
}
@@ -122,7 +127,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
dataMapFormat.setIsWriteToFile(false)
dataMapFormat.setFallbackJob()
val splits = IndexServer.getSplits(dataMapFormat).getExtendedBlockets(dataMapFormat
- .getCarbonTable.getTablePath, dataMapFormat.getQueryId)
+ .getCarbonTable.getTablePath, dataMapFormat.getQueryId, dataMapFormat.isCountStarJob)
// Fire a job to clear the cache from executors as Embedded mode does not maintain the cache.
IndexServer.invalidateSegmentCache(dataMapFormat.getCarbonTable, dataMapFormat
.getValidSegmentIds.asScala.toArray)
@@ -130,4 +135,8 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
splits
}
+ override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
+ IndexServer.getCount(dataMapFormat).get()
+ }
+
}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
new file mode 100644
index 0000000..4a080fa
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.indexserver
+
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
+import scala.concurrent.duration.Duration
+
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapreduce.{InputSplit, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat}
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+/**
+ * An RDD which will get the count for the table.
+ */
+class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: DistributableDataMapFormat)
+ extends CarbonRDD[(String, String)](ss, Nil) {
+
+ @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
+ .getName)
+
+ override protected def getPreferredLocations(split: Partition): Seq[String] = {
+ if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
+ split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
+ } else {
+ Seq()
+ }
+ }
+
+ override def internalCompute(split: Partition,
+ context: TaskContext): Iterator[(String, String)] = {
+ val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId,
+ id, TaskType.MAP, split.index, 0)
+ val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
+ val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
+ val numOfThreads = CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning
+ val service = Executors
+ .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true))
+ implicit val ec: ExecutionContextExecutor = ExecutionContext
+ .fromExecutor(service)
+ val futures = if (inputSplits.length <= numOfThreads) {
+ inputSplits.map {
+ split => generateFuture(Seq(split))
+ }
+ } else {
+ DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map {
+ splits => generateFuture(splits)
+ }
+ }
+ // scalastyle:off awaitresult
+ val results = Await.result(Future.sequence(futures), Duration.Inf).flatten
+ // scalastyle:on awaitresult
+ val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+ SparkEnv.get.blockManager.blockManagerId.executorId
+ }"
+ val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
+ CacheProvider.getInstance().getCarbonCache.getCurrentSize
+ } else {
+ 0L
+ }
+ Iterator((executorIP + "_" + cacheSize.toString, results.map(_._2.toLong).sum.toString))
+ }
+
+ override protected def internalGetPartitions: Array[Partition] = {
+ new DistributedPruneRDD(ss, dataMapFormat).partitions
+ }
+
+ private def generateFuture(split: Seq[InputSplit])
+ (implicit executionContext: ExecutionContext) = {
+ Future {
+ val segments = split.map { inputSplit =>
+ val distributable = inputSplit.asInstanceOf[DataMapDistributableWrapper]
+ distributable.getDistributable.getSegment
+ .setReadCommittedScope(dataMapFormat.getReadCommittedScope)
+ distributable.getDistributable.getSegment
+ }
+ val defaultDataMap = DataMapStoreManager.getInstance
+ .getDataMap(dataMapFormat.getCarbonTable, split.head
+ .asInstanceOf[DataMapDistributableWrapper].getDistributable.getDataMapSchema)
+ defaultDataMap.getBlockRowCount(segments.toList.asJava, dataMapFormat
+ .getPartitions, defaultDataMap).asScala
+ }
+ }
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index d8b9c19..76d33b4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -17,15 +17,12 @@
package org.apache.carbondata.indexserver
-import java.text.SimpleDateFormat
-import java.util.Date
import java.util.concurrent.Executors
import scala.collection.JavaConverters._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.Duration
-import org.apache.commons.lang.StringUtils
import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -35,9 +32,8 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, DistributableDataMapFormat, TableDataMap}
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
-import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapper}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
@@ -65,21 +61,13 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
@transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
.getName)
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new Date())
- }
- var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _
- private def groupSplits(xs: Seq[InputSplit], n: Int) = {
- val (quot, rem) = (xs.size / n, xs.size % n)
- val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1))
- (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList
- }
+ var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(String, ExtendedBlockletWrapper)] = {
- val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+ val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId,
+ id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
if (dataMapFormat.isJobToClearDataMaps) {
@@ -118,7 +106,7 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
split => generateFuture(Seq(split), attemptContext)
}
} else {
- groupSplits(inputSplits, numOfThreads).map {
+ DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map {
splits => generateFuture(splits, attemptContext)
}
}
@@ -139,14 +127,13 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
}"
val value = (executorIP + "_" + cacheSize.toString, new ExtendedBlockletWrapper(f.toList
.asJava, dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId,
- dataMapFormat.isWriteToFile))
+ dataMapFormat.isWriteToFile, dataMapFormat.isCountStarJob))
Iterator(value)
}
}
- private def generateFuture(split: Seq[InputSplit],
- attemptContextImpl: TaskAttemptContextImpl)
- (implicit executionContext: ExecutionContext) = {
+ private def generateFuture(split: Seq[InputSplit], attemptContextImpl: TaskAttemptContextImpl)
+ (implicit executionContext: ExecutionContext): Future[Seq[ExtendedBlocklet]] = {
Future {
split.flatMap { inputSplit =>
val blocklets = new java.util.ArrayList[ExtendedBlocklet]()
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 933ec15..4819779 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -16,6 +16,8 @@
*/
package org.apache.carbondata.indexserver
+import java.text.SimpleDateFormat
+import java.util.Date
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
@@ -325,4 +327,15 @@ object DistributedRDDUtils {
}
}
+ def groupSplits(xs: Seq[InputSplit], n: Int): List[Seq[InputSplit]] = {
+ val (quot, rem) = (xs.size / n, xs.size % n)
+ val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1))
+ (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList
+ }
+
+ def generateTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index fdaa3d1..abee487 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation}
@@ -58,6 +59,9 @@ trait ServerInterface {
*/
def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String], jobGroupId: String = ""): Unit
+
+ def getCount(request: DistributableDataMapFormat): LongWritable
+
}
/**
@@ -99,6 +103,21 @@ object IndexServer extends ServerInterface {
})
}
+ def getCount(request: DistributableDataMapFormat): LongWritable = {
+ doAs {
+ if (!request.isFallbackJob) {
+ sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
+ sparkSession.sparkContext
+ .setLocalProperty("spark.job.description", request.getTaskGroupDesc)
+ }
+ val splits = new DistributedCountRDD(sparkSession, request).collect()
+ if (!request.isFallbackJob) {
+ DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+ }
+ new LongWritable(splits.map(_._2.toLong).sum)
+ }
+ }
+
def getSplits(request: DistributableDataMapFormat): ExtendedBlockletWrapperContainer = {
doAs {
if (!request.isFallbackJob) {