You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/14 05:57:46 UTC
carbondata git commit: [CARBONDATA-2431] Incremental data added after
external table creation is not reflecting while doing select query issue is
fixed.
Repository: carbondata
Updated Branches:
refs/heads/master 2881c6bbc -> f1a6c7cf5
[CARBONDATA-2431] Incremental data added after external table creation is not reflecting while doing select query issue is fixed.
This closes #2262
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f1a6c7cf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f1a6c7cf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f1a6c7cf
Branch: refs/heads/master
Commit: f1a6c7cf548cd33ef26bd99f26c7fcf7e367c9c7
Parents: 2881c6b
Author: rahulforallp <ra...@knoldus.in>
Authored: Thu May 3 14:11:12 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon May 14 11:27:33 2018 +0530
----------------------------------------------------------------------
.../core/datamap/DataMapStoreManager.java | 19 +--
.../apache/carbondata/core/datamap/Segment.java | 7 +
.../LatestFilesReadCommittedScope.java | 32 ++++-
.../ReadCommittedIndexFileSnapShot.java | 10 +-
.../core/readcommitter/ReadCommittedScope.java | 5 +
.../TableStatusReadCommittedScope.java | 13 ++
.../core/statusmanager/SegmentRefreshInfo.java | 65 +++++++++
.../hadoop/api/CarbonTableInputFormat.java | 10 +-
.../TestNonTransactionalCarbonTable.scala | 136 +++++++++++++++++++
9 files changed, 282 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a3be26a..072b86e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorage
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonSessionInfo;
@@ -454,7 +455,7 @@ public final class DataMapStoreManager {
// This map stores the latest segment refresh time.So in case of update/delete we check the
// time against this map.
- private Map<String, Long> segmentRefreshTime = new HashMap<>();
+ private Map<String, SegmentRefreshInfo> segmentRefreshTime = new HashMap<>();
// This map keeps the manual refresh entries from users. It is mainly used for partition
// altering.
@@ -465,23 +466,25 @@ public final class DataMapStoreManager {
SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails();
for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
- segmentRefreshTime.put(updateVO.getSegmentId(), updateVO.getCreatedOrUpdatedTimeStamp());
+ segmentRefreshTime.put(updateVO.getSegmentId(),
+ new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0));
}
}
- public boolean isRefreshNeeded(String segmentId, SegmentUpdateStatusManager statusManager) {
- UpdateVO updateVO = statusManager.getInvalidTimestampRange(segmentId);
+ public boolean isRefreshNeeded(Segment seg, UpdateVO updateVo) throws IOException {
+ SegmentRefreshInfo segmentRefreshInfo =
+ seg.getSegmentRefreshInfo(updateVo);
+ String segmentId = seg.getSegmentNo();
if (segmentRefreshTime.get(segmentId) == null) {
- segmentRefreshTime.put(segmentId, updateVO.getCreatedOrUpdatedTimeStamp());
+ segmentRefreshTime.put(segmentId, segmentRefreshInfo);
return true;
}
if (manualSegmentRefresh.get(segmentId) != null && manualSegmentRefresh.get(segmentId)) {
manualSegmentRefresh.put(segmentId, false);
return true;
}
- Long updateTimestamp = updateVO.getLatestUpdateTimestamp();
- boolean isRefresh =
- updateTimestamp != null && (updateTimestamp > segmentRefreshTime.get(segmentId));
+
+ boolean isRefresh = segmentRefreshInfo.compare(segmentRefreshTime.get(segmentId));
if (isRefresh) {
segmentRefreshTime.remove(segmentId);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 476f9da..85c7176 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -25,8 +25,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -111,6 +113,11 @@ public class Segment implements Serializable {
return readCommittedScope.getCommittedIndexFile(this);
}
+ public SegmentRefreshInfo getSegmentRefreshInfo(UpdateVO updateVo)
+ throws IOException {
+ return readCommittedScope.getCommitedSegmentRefreshInfo(this, updateVo);
+ }
+
public String getSegmentNo() {
return segmentNo;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index de7f8a9..2306330 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -28,7 +28,9 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -43,7 +45,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
- public LatestFilesReadCommittedScope(String path) {
+ public LatestFilesReadCommittedScope(String path) {
this.carbonFilePath = path;
try {
takeCarbonIndexFileSnapShot();
@@ -104,6 +106,20 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
return indexFileStore;
}
+ @Override public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+ Segment segment, UpdateVO updateVo) throws IOException {
+ Map<String, SegmentRefreshInfo> snapShot =
+ readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap();
+ String segName;
+ if (segment.getSegmentNo() != null) {
+ segName = segment.getSegmentNo();
+ } else {
+ segName = segment.getSegmentFileName();
+ }
+ SegmentRefreshInfo segmentRefreshInfo = snapShot.get(segName);
+ return segmentRefreshInfo;
+ }
+
private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
if (indexFilePath.contains("/Fact/Part0/Segment_")) {
// This is CarbonFile case where the Index files are present inside the Segment Folder
@@ -128,6 +144,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
throw new IOException("No files are present in the table location :" + carbonFilePath);
}
Map<String, List<String>> indexFileStore = new HashMap<>();
+ Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();
if (file.isDirectory()) {
CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
for (int i = 0; i < carbonIndexFiles.length; i++) {
@@ -139,18 +156,29 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
getSegmentID(carbonIndexFiles[i].getName(), carbonIndexFiles[i].getAbsolutePath());
// TODO. During Partition table handling, place Segment File Name.
List<String> indexList;
+ SegmentRefreshInfo segmentRefreshInfo;
if (indexFileStore.get(segId) == null) {
indexList = new ArrayList<>(1);
+ segmentRefreshInfo =
+ new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
+ segmentTimestampUpdaterMap.put(segId, segmentRefreshInfo);
} else {
// Entry is already present.
indexList = indexFileStore.get(segId);
+ segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId);
}
indexList.add(carbonIndexFiles[i].getAbsolutePath());
+ if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i]
+ .getLastModifiedTime()) {
+ segmentRefreshInfo
+ .setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime());
+ }
indexFileStore.put(segId, indexList);
+ segmentRefreshInfo.setCountOfFileInSegment(indexList.size());
}
}
ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
- new ReadCommittedIndexFileSnapShot(indexFileStore);
+ new ReadCommittedIndexFileSnapShot(indexFileStore, segmentTimestampUpdaterMap);
this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
prepareLoadMetadata();
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
index 3e8e04f..70ca6ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
/**
* This class is going to save the the Index files which are taken snapshot
@@ -36,12 +37,19 @@ public class ReadCommittedIndexFileSnapShot implements Serializable {
* Segment Numbers are mapped with list of Index Files.
*/
private Map<String, List<String>> segmentIndexFileMap;
+ private Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap;
- public ReadCommittedIndexFileSnapShot(Map<String, List<String>> segmentIndexFileMap) {
+ public ReadCommittedIndexFileSnapShot(Map<String, List<String>> segmentIndexFileMap,
+ Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap) {
this.segmentIndexFileMap = segmentIndexFileMap;
+ this.segmentTimestampUpdaterMap = segmentTimestampUpdaterMap;
}
public Map<String, List<String>> getSegmentIndexFileMap() {
return segmentIndexFileMap;
}
+
+ public Map<String, SegmentRefreshInfo> getSegmentTimestampUpdaterMap() {
+ return segmentTimestampUpdaterMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index 9ae462b..6ff4b89 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -23,7 +23,9 @@ import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
/**
* ReadCommitted interface that defines a read scope.
@@ -43,5 +45,8 @@ public interface ReadCommittedScope extends Serializable {
*/
public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException ;
+ public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+ Segment segment, UpdateVO updateVo) throws IOException;
+
public void takeCarbonIndexFileSnapShot() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 41ce31c..91ebd41 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -25,7 +25,9 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -77,6 +79,17 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
return indexFiles;
}
+ public SegmentRefreshInfo getCommitedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
+ throws IOException {
+ SegmentRefreshInfo segmentRefreshInfo;
+ if (updateVo != null) {
+ segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getCreatedOrUpdatedTimeStamp(), 0);
+ } else {
+ segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+ }
+ return segmentRefreshInfo;
+ }
+
@Override public void takeCarbonIndexFileSnapShot() throws IOException {
// Only Segment Information is updated.
// File information will be fetched on the fly according to the fecthed segment info.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
new file mode 100644
index 0000000..11fb73f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import java.io.Serializable;
+
+public class SegmentRefreshInfo implements Serializable {
+
+ private Long segmentUpdatedTimestamp;
+ private Integer countOfFileInSegment;
+
+ public SegmentRefreshInfo(Long segmentUpdatedTimestamp, Integer countOfFileInSegment) {
+ this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+ this.countOfFileInSegment = countOfFileInSegment;
+ }
+
+ public Long getSegmentUpdatedTimestamp() {
+ return segmentUpdatedTimestamp;
+ }
+
+ public void setSegmentUpdatedTimestamp(Long segmentUpdatedTimestamp) {
+ this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+ }
+
+ public Integer getCountOfFileInSegment() {
+ return countOfFileInSegment;
+ }
+
+ public void setCountOfFileInSegment(Integer countOfFileInSegment) {
+ this.countOfFileInSegment = countOfFileInSegment;
+ }
+
+ public boolean compare(Object o) {
+ if (!(o instanceof SegmentRefreshInfo)) return false;
+
+ SegmentRefreshInfo that = (SegmentRefreshInfo) o;
+
+ if (segmentUpdatedTimestamp > that.segmentUpdatedTimestamp || !countOfFileInSegment
+ .equals(that.countOfFileInSegment)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override public int hashCode() {
+ int result = segmentUpdatedTimestamp.hashCode();
+ result = 31 * result + countOfFileInSegment.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index a32e17a..1db3138 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -230,13 +230,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
readCommittedScope);
// Clean the updated segments from memory if the update happens on segments
List<Segment> toBeCleanedSegments = new ArrayList<>();
- for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
- .getUpdateStatusDetails()) {
+ for (Segment filteredSegment : filteredSegmentToAccess) {
boolean refreshNeeded =
DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
- .isRefreshNeeded(segmentUpdateDetail.getSegmentName(), updateStatusManager);
+ .isRefreshNeeded(filteredSegment,
+ updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
if (refreshNeeded) {
- toBeCleanedSegments.add(new Segment(segmentUpdateDetail.getSegmentName(), null));
+ toBeCleanedSegments.add(filteredSegment);
}
}
// Clean segments if refresh is needed
@@ -246,6 +246,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
toBeCleanedSegments.add(segment);
}
}
+
+
if (toBeCleanedSegments.size() > 0) {
DataMapStoreManager.getInstance()
.clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 86fda21..58ce5fa 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -198,6 +198,40 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
}
+ // prepare sdk writer output
+ def buildTestDataWithSameUUID(rows: Int,
+ persistSchema: Boolean,
+ options: util.Map[String, String],
+ sortColumns: List[String]): Any = {
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
+ .isTransactionalTable(false)
+ .sortBy(sortColumns.toArray)
+ .uniqueIdentifier(
+ 123).withBlockSize(2)
+ .buildWriterForCSVInput()
+ var i = 0
+ while (i < rows) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+
+ case _ => None
+ }
+ }
def cleanTestData() = {
FileUtils.deleteDirectory(new File(writerPath))
@@ -229,6 +263,44 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS sdkOutputTable")
}
+ test(
+ "Read two sdk writer outputs before and after deleting the existing files and creating new " +
+ "files with same schema and UUID") {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildTestDataWithSameUUID(3, false, null, List("name"))
+ assert(new File(writerPath).exists())
+
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+ new File(writerPath).listFiles().map(x => LOGGER.audit(x.getName +" : "+x.lastModified()))
+ FileUtils.deleteDirectory(new File(writerPath))
+ // Thread.sleep is required because it is possible sometime deletion
+ // and creation of new file can happen at same timestamp.
+ Thread.sleep(1000)
+ assert(!new File(writerPath).exists())
+ buildTestDataWithSameUUID(4, false, null, List("name"))
+ new File(writerPath).listFiles().map(x => LOGGER.audit(x.getName +" : "+x.lastModified()))
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot3", 3, 1.5)))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
test("test create external table with sort columns") {
buildTestDataWithSortColumns()
assert(new File(writerPath).exists())
@@ -638,9 +710,40 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0)))
+ buildTestDataWithSameUUID(3, false, null, List("name"))
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ buildTestDataWithSameUUID(3, false, null, List("name"))
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
//test filter query
checkAnswer(sql("select * from sdkOutputTable where age = 1"), Seq(
Row("robot1", 1, 0.5),
+ Row("robot1", 1, 0.5),
+ Row("robot1", 1, 0.5),
Row("robot1", 1, 0.5)))
// test the default sort column behavior in Nontransactional table
@@ -653,6 +756,39 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
+ test(
+ "Read two sdk writer outputs before and after deleting the existing files and creating new " +
+ "files with same schema") {
+ buildTestDataSingleFile()
+ assert(new File(writerPath).exists())
+
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0)))
+
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildTestData(4, false, null)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("robot0", 0, 0.0),
+ Row("robot1", 1, 0.5),
+ Row("robot2", 2, 1.0),
+ Row("robot3", 3, 1.5)))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
test("test bad records form sdk writer") {
//1. Action = FORCE