You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:29 UTC

[28/50] [abbrv] carbondata git commit: [CARBONDATA-2428] Support flat folder for managed carbon table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 9d0c933..b76722b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -279,7 +279,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     this.carbonDataFileName = CarbonTablePath
         .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
-            "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+            "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
     this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
         + carbonDataFileName;
     try {
@@ -368,7 +368,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
           .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
               model.getBucketId(), model.getTaskExtension(),
-              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
       indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
     } else {
       // randomly choose a temp location for index file
@@ -378,7 +378,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       indexFileName = chosenTempLocation + File.separator + CarbonTablePath
           .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
               model.getBucketId(), model.getTaskExtension(),
-              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
     }
 
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 0ea7223..da77cf6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -601,7 +601,9 @@ public final class CarbonLoaderUtil {
     long sizePerNode = 0;
     long totalFileSize = 0;
     if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
-      sizePerNode = blockInfos.size() / noofNodes;
+      if (blockInfos.size() > 0) {
+        sizePerNode = blockInfos.size() / noofNodes;
+      }
       sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
     } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy
         || BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == blockAssignmentStrategy) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index f6406c7..4bfadce 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -37,15 +37,18 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonTaskInfo;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
@@ -164,12 +167,15 @@ public class SearchRequestHandler {
     Objects.requireNonNull(datamap);
     List<Segment> segments = new LinkedList<>();
     HashMap<String, Integer> uniqueSegments = new HashMap<>();
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(
+            CarbonTablePath.getMetadataPath(table.getTablePath()));
     for (CarbonInputSplit split : mbSplit.getAllSplits()) {
-      String segmentId = split.getSegmentId();
+      String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
       if (uniqueSegments.get(segmentId) == null) {
-        segments.add(Segment.toSegment(
-                segmentId,
-                new LatestFilesReadCommittedScope(table.getTablePath(), segmentId)));
+        segments.add(Segment.toSegment(segmentId,
+            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+                loadMetadataDetails)));
         uniqueSegments.put(segmentId, 1);
       } else {
         uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 4653445..bd622f0 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -128,7 +128,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
     segmentDir = CarbonTablePath.getSegmentPath(
         carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
-    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId);
   }
 
   private void initializeAtFirstRow() throws IOException, InterruptedException {