You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 12:33:25 UTC
[26/50] [abbrv] carbondata git commit: [REBASE] resolve conflict
after rebasing to master
[REBASE] resolve conflict after rebasing to master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6216294c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6216294c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6216294c
Branch: refs/heads/carbonstore-rebase5
Commit: 6216294c1e28c1db05e572f0aac3a991d345e085
Parents: 3fdd5d0
Author: Jacky Li <ja...@qq.com>
Authored: Tue Feb 27 08:51:25 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Mar 4 20:32:12 2018 +0800
----------------------------------------------------------------------
.../core/datamap/dev/AbstractDataMapWriter.java | 5 ++--
.../core/datamap/dev/DataMapFactory.java | 2 +-
.../blockletindex/BlockletDataMapFactory.java | 2 +-
.../SegmentUpdateStatusManager.java | 9 +-----
.../datamap/examples/MinMaxDataMapFactory.java | 5 ++--
.../datamap/examples/MinMaxDataWriter.java | 7 +++--
.../testsuite/datamap/CGDataMapTestCase.scala | 26 ++++++++--------
.../testsuite/datamap/DataMapWriterSuite.scala | 19 ++++++------
.../testsuite/datamap/FGDataMapTestCase.scala | 31 +++++++++-----------
.../iud/DeleteCarbonTableTestCase.scala | 2 +-
.../TestInsertAndOtherCommandConcurrent.scala | 14 +++++----
.../StandardPartitionTableCleanTestCase.scala | 12 ++++----
.../carbondata/spark/util/DataLoadingUtil.scala | 2 +-
.../datamap/DataMapWriterListener.java | 2 +-
.../processing/merger/CarbonDataMergerUtil.java | 8 +----
.../merger/CompactionResultSortProcessor.java | 4 +--
.../merger/RowResultMergerProcessor.java | 5 ++--
.../partition/spliter/RowResultProcessor.java | 5 ++--
.../util/CarbonDataProcessorUtil.java | 4 +--
.../processing/util/CarbonLoaderUtil.java | 9 ------
20 files changed, 73 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
index bcc9bad..de6dcb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev;
import java.io.IOException;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -35,10 +36,10 @@ public abstract class AbstractDataMapWriter {
protected String writeDirectoryPath;
- public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId,
+ public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
String writeDirectoryPath) {
this.identifier = identifier;
- this.segmentId = segmentId;
+ this.segmentId = segment.getSegmentNo();
this.writeDirectoryPath = writeDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index df5670d..50ac279 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -39,7 +39,7 @@ public interface DataMapFactory<T extends DataMap> {
/**
* Return a new write for this datamap
*/
- AbstractDataMapWriter createWriter(Segment segment);
+ AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath);
/**
* Get the datamap for segmentid
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index efe2b71..ee849bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -72,7 +72,7 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
}
@Override
- public AbstractDataMapWriter createWriter(Segment segment) {
+ public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
throw new UnsupportedOperationException("not implemented");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 94a4243..39eb262 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -260,13 +260,8 @@ public class SegmentUpdateStatusManager {
/**
* Returns all delta file paths of specified block
- *
- * @param tupleId
- * @param extension
- * @return
- * @throws Exception
*/
- public List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
+ private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
try {
String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
String completeBlockName = CarbonTablePath.addDataPartPrefix(
@@ -405,10 +400,8 @@ public class SegmentUpdateStatusManager {
public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
String segmentPath = CarbonTablePath.getSegmentPath(
identifier.getTablePath(), segmentId.getSegmentNo());
-
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-
for (SegmentUpdateDetails block : updateDetails) {
if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
(block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index 266c107..4ef74a7 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -52,9 +52,8 @@ public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
* @param segment
* @return
*/
- @Override public AbstractDataMapWriter createWriter(Segment segment) {
- return new MinMaxDataWriter(identifier, segment.getSegmentNo(),
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
+ @Override public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+ return new MinMaxDataWriter(identifier, segment, writeDirectoryPath);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index fe0bbcf..5046182 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -52,11 +53,11 @@ public class MinMaxDataWriter extends AbstractDataMapWriter {
private String dataWritePath;
- public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId,
+ public MinMaxDataWriter(AbsoluteTableIdentifier identifier, Segment segment,
String dataWritePath) {
- super(identifier, segmentId, dataWritePath);
+ super(identifier, segment, dataWritePath);
this.identifier = identifier;
- this.segmentId = segmentId;
+ this.segmentId = segment.getSegmentNo();
this.dataWritePath = dataWritePath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 4b6f231..1cbbcb4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
import org.apache.carbondata.core.datastore.compression.SnappyCompressor
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
/**
* Return a new write for this datamap
*/
- override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+ new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
}
/**
* Get the datamap for segmentid
*/
- override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
+ override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = {
val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -108,9 +108,9 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
*
* @return
*/
- override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -125,7 +125,7 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
/**
* Clears datamap of the segment
*/
- override def clear(segmentId: String): Unit = {
+ override def clear(segment: Segment): Unit = {
}
@@ -175,7 +175,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
override def prune(
filterExp: FilterResolverIntf,
segmentProperties: SegmentProperties,
- partitions: java.util.List[String]): java.util.List[Blocklet] = {
+ partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
val expression = filterExp.getFilterExpression
getEqualToExpression(expression, buffer)
@@ -184,7 +184,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
}
val meta = findMeta(value(0).getBytes)
meta.map { f=>
- new Blocklet(f._1, f._2+"")
+ new Blocklet(f._1, f._2 + "")
}.asJava
}
@@ -219,10 +219,10 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
}
class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segmentId: String,
+ segment: Segment,
dataWritePath: String,
dataMapName: String)
- extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+ extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
var currentBlockId: String = null
val cgwritepath = dataWritePath + "/" +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 2f8a1d1..7e93959 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -20,21 +20,19 @@ package org.apache.carbondata.spark.testsuite.datamap
import java.util
import scala.collection.JavaConverters._
+
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.Event
@@ -49,15 +47,16 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
override def fireEvent(event: Event): Unit = ???
- override def clear(segmentId: Segment): Unit = {}
+ override def clear(segment: Segment): Unit = {}
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+ override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
- override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+ override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
- override def createWriter(segmentId: Segment): AbstractDataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter =
+ DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
@@ -175,9 +174,9 @@ object DataMapWriterSuite {
var callbackSeq: Seq[String] = Seq[String]()
- def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+ def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment,
dataWritePath: String) =
- new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+ new AbstractDataMapWriter(identifier, segment, dataWritePath) {
override def onPageAdded(
blockletId: Int,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d1bb65f..9c8cc15 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
import org.apache.carbondata.core.datastore.compression.SnappyCompressor
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
/**
* Return a new write for this datamap
*/
- override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+ new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
}
/**
* Get the datamap for segmentid
*/
- override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = {
+ override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainDataMap] = {
val file = FileFactory
- .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -99,9 +99,9 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
*
* @return
*/
- override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
- val file = FileFactory
- .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
+ val file = FileFactory.getCarbonFile(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -112,7 +112,6 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
}.toList.asJava
}
-
/**
*
* @param event
@@ -124,7 +123,7 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
/**
* Clears datamap of the segment
*/
- override def clear(segmentId: String): Unit = {
+ override def clear(segment: Segment): Unit = {
}
/**
@@ -173,7 +172,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
override def prune(
filterExp: FilterResolverIntf,
segmentProperties: SegmentProperties,
- partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+ partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
val expression = filterExp.getFilterExpression
getEqualToExpression(expression, buffer)
@@ -187,7 +186,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
- value: Array[Byte]): Option[FineGrainBlocklet] = {
+ value: Array[Byte]): Option[Blocklet] = {
val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
val obj = new ObjectInputStream(outputStream)
@@ -211,12 +210,10 @@ class FGDataMap extends AbstractFineGrainDataMap {
pg.setRowId(f._2(p._2).toArray)
pg
}
- pages
Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
} else {
None
}
-
}
private def findMeta(value: Array[Byte]) = {
@@ -249,8 +246,8 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segmentId: String, dataWriterPath: String, dataMapName: String)
- extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+ segment: Segment, dataWriterPath: String, dataMapName: String)
+ extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
var currentBlockId: String = null
val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index d05f022..510903a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -194,7 +194,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("delete from update_status_files where age=5").show()
val carbonTable = CarbonEnv
.getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession)
- val metaPath = carbonTable.getMetaDataFilepath
+ val metaPath = carbonTable.getMetadataPath
val files = FileFactory.getCarbonFile(metaPath)
val result = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.getClass
if(result.getCanonicalName.contains("CarbonFileMetastore")) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 5550358..b39c44c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -269,7 +269,11 @@ object Global {
class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
- override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+ private var identifier: AbsoluteTableIdentifier = _
+
+ override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = {
+ this.identifier = identifier
+ }
override def fireEvent(event: Event): Unit = ???
@@ -277,12 +281,12 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+ override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
- override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+ override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
- override def createWriter(segmentId: Segment): AbstractDataMapWriter = {
- new AbstractDataMapWriter {
+ override def createWriter(segment: Segment, writeDirectoryPath: String): AbstractDataMapWriter = {
+ new AbstractDataMapWriter(identifier, segment, writeDirectoryPath) {
override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
override def onBlockletEnd(blockletId: Int): Unit = { }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index f238d2b..cfc6983 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -52,14 +52,12 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
- carbonTable.getTablePath)
- val partitions = CarbonFilters
- .getPartitions(Seq.empty,
- sqlContext.sparkSession,
- TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
+ val partitions = CarbonFilters.getPartitions(
+ Seq.empty,
+ sqlContext.sparkSession,
+ TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
assert(partitions.get.length == partition)
- val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath)
+ val details = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
val segLoad = details.find(_.getLoadName.equals(segmentId)).get
val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile)
assert(seg.getIndexFiles.size == indexes)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index cee40c8..49e4420 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -441,7 +441,7 @@ object DataLoadingUtil {
private def isUpdationRequired(isForceDeletion: Boolean,
carbonTable: CarbonTable,
- absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+ absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = {
val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
// Delete marked loads
val isUpdationRequired =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 5083ab5..1104229 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,7 @@ public class DataMapWriterListener {
}
List<String> columns = factory.getMeta().getIndexedColumns();
List<AbstractDataMapWriter> writers = registry.get(columns);
- AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null));
+ AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
if (writers != null) {
writers.add(writer);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 4579c85..1ab803b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1005,14 +1005,8 @@ public final class CarbonDataMergerUtil {
/**
* This method traverses Update Delta Files inside the seg and return true
* if UpdateDelta Files are more than IUD Compaction threshold.
- *
- * @param seg
- * @param identifier
- * @param segmentUpdateStatusManager
- * @param numberDeltaFilesThreshold
- * @return
*/
- public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
+ private static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager,
int numberDeltaFilesThreshold) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index bce1b33..7435d73 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -406,8 +406,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
+ carbonLoadModel.getFactTimeStamp() + ".tmp";
} else {
carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(),
- tableName, carbonLoadModel.getSegmentId());
+ .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName,
+ carbonLoadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 4aca13a..2616def 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -76,9 +76,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
.getFactTimeStamp() + ".tmp";
} else {
- carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getSegmentId());
+ carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+ loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 92db4c5..221697f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -47,9 +47,8 @@ public class RowResultProcessor {
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
this.segmentProperties = segProp;
String tableName = carbonTable.getTableName();
- String carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getSegmentId());
+ String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+ loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation, carbonStoreLocation);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 64e50b0..efd715c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -393,8 +392,7 @@ public final class CarbonDataProcessorUtil {
*
* @return data directory path
*/
- public static String createCarbonStoreLocation(String factStoreLocation,
- String databaseName, String tableName, String segmentId) {
+ public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6216294c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1f93ba1..b7aadd0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -459,15 +459,6 @@ public final class CarbonLoaderUtil {
}
- public static String readCurrentTime() {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
- }
-
public static boolean isValidEscapeSequence(String escapeChar) {
return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||