You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:29 UTC
[28/50] [abbrv] carbondata git commit: [CARBONDATA-2428] Support flat
folder for managed carbon table
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 9d0c933..b76722b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -279,7 +279,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
this.carbonDataFileName = CarbonTablePath
.getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
- "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+ "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
+ carbonDataFileName;
try {
@@ -368,7 +368,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
- "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+ "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
} else {
// randomly choose a temp location for index file
@@ -378,7 +378,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
indexFileName = chosenTempLocation + File.separator + CarbonTablePath
.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
- "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+ "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
}
CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 0ea7223..da77cf6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -601,7 +601,9 @@ public final class CarbonLoaderUtil {
long sizePerNode = 0;
long totalFileSize = 0;
if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
- sizePerNode = blockInfos.size() / noofNodes;
+ if (blockInfos.size() > 0) {
+ sizePerNode = blockInfos.size() / noofNodes;
+ }
sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
} else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy
|| BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == blockAssignmentStrategy) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index f6406c7..4bfadce 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -37,15 +37,18 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonTaskInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonRecordReader;
@@ -164,12 +167,15 @@ public class SearchRequestHandler {
Objects.requireNonNull(datamap);
List<Segment> segments = new LinkedList<>();
HashMap<String, Integer> uniqueSegments = new HashMap<>();
+ LoadMetadataDetails[] loadMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(table.getTablePath()));
for (CarbonInputSplit split : mbSplit.getAllSplits()) {
- String segmentId = split.getSegmentId();
+ String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
if (uniqueSegments.get(segmentId) == null) {
- segments.add(Segment.toSegment(
- segmentId,
- new LatestFilesReadCommittedScope(table.getTablePath(), segmentId)));
+ segments.add(Segment.toSegment(segmentId,
+ new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+ loadMetadataDetails)));
uniqueSegments.put(segmentId, 1);
} else {
uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 4653445..bd622f0 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -128,7 +128,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
segmentDir = CarbonTablePath.getSegmentPath(
carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
- fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+ fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId);
}
private void initializeAtFirstRow() throws IOException, InterruptedException {