You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/13 20:01:18 UTC
[1/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter
interface
Repository: carbondata
Updated Branches:
refs/heads/master 85cbad246 -> f089287ce
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index adb97ae..5edd675 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.FileFooter3;
+import org.apache.carbondata.processing.store.TablePage;
import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -57,22 +58,25 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
/**
* persist the page data to be written in the file
*/
- private DataWriterHolder dataWriterHolder;
+ private BlockletDataHolder blockletDataHolder;
- private long blockletSize;
+ /**
+ * Threshold of blocklet size in MB
+ */
+ private long blockletSizeThreshold;
public CarbonFactDataWriterImplV3(CarbonDataWriterVo dataWriterVo) {
super(dataWriterVo);
- blockletSize = Long.parseLong(CarbonProperties.getInstance()
+ blockletSizeThreshold = Long.parseLong(CarbonProperties.getInstance()
.getProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB,
CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE))
* CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
* CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
- if (blockletSize > fileSizeInBytes) {
- blockletSize = fileSizeInBytes;
- LOGGER.info("Blocklet size configure for table is: " + blockletSize);
+ if (blockletSizeThreshold > fileSizeInBytes) {
+ blockletSizeThreshold = fileSizeInBytes;
+ LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold);
}
- dataWriterHolder = new DataWriterHolder();
+ blockletDataHolder = new BlockletDataHolder();
}
@Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
@@ -100,88 +104,118 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
}
/**
- * Below method will be used to write one table page data
+ * Below method will be used to write one table page data, invoked by Consumer
+ * @param tablePage
*/
- @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+ @Override public void writeTablePage(TablePage tablePage)
throws CarbonDataWriterException {
// condition for writting all the pages
- if (!encodedTablePage.isLastPage()) {
+ if (!tablePage.isLastPage()) {
boolean isAdded = false;
// check if size more than blocklet size then write the page to file
- if (dataWriterHolder.getSize() + encodedTablePage.getEncodedSize() >= blockletSize) {
- // if one page size is more than blocklet size
- if (dataWriterHolder.getEncodedTablePages().size() == 0) {
+ if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize() >=
+ blockletSizeThreshold) {
+ // if blocklet size exceeds threshold, write blocklet data
+ if (blockletDataHolder.getEncodedTablePages().size() == 0) {
isAdded = true;
- dataWriterHolder.addPage(encodedTablePage);
+ addPageData(tablePage);
}
- LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
- + " :Rows Added: " + dataWriterHolder.getTotalRows());
+ LOGGER.info("Number of Pages for blocklet is: " + blockletDataHolder.getNumberOfPagesAdded()
+ + " :Rows Added: " + blockletDataHolder.getTotalRows());
+
// write the data
writeBlockletToFile();
+
}
if (!isAdded) {
- dataWriterHolder.addPage(encodedTablePage);
+ addPageData(tablePage);
}
} else {
//for last blocklet check if the last page will exceed the blocklet size then write
// existing pages and then last page
- if (encodedTablePage.getPageSize() > 0) {
- dataWriterHolder.addPage(encodedTablePage);
+
+ if (tablePage.getPageSize() > 0) {
+ addPageData(tablePage);
}
- if (dataWriterHolder.getNumberOfPagesAdded() > 0) {
- LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
- + " :Rows Added: " + dataWriterHolder.getTotalRows());
+ if (blockletDataHolder.getNumberOfPagesAdded() > 0) {
+ LOGGER.info("Number of Pages for blocklet is: " + blockletDataHolder.getNumberOfPagesAdded()
+ + " :Rows Added: " + blockletDataHolder.getTotalRows());
writeBlockletToFile();
}
}
}
+ private void addPageData(TablePage tablePage) {
+ blockletDataHolder.addPage(tablePage);
+ if (listener != null) {
+ if (pageId == 0) {
+ listener.onBlockletStart(blockletId);
+ }
+ listener.onPageAdded(blockletId, pageId++, tablePage);
+ }
+ }
+
/**
- * Write one blocklet data to file
+ * Write the collect blocklet data (blockletDataHolder) to file
*/
private void writeBlockletToFile() {
// get the list of all encoded table page
- List<EncodedTablePage> encodedTablePageList = dataWriterHolder.getEncodedTablePages();
+ List<EncodedTablePage> encodedTablePageList = blockletDataHolder.getEncodedTablePages();
int numDimensions = encodedTablePageList.get(0).getNumDimensions();
int numMeasures = encodedTablePageList.get(0).getNumMeasures();
- long blockletDataSize = 0;
// get data chunks for all the column
byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][];
+ long metadataSize = fillDataChunk(encodedTablePageList, dataChunkBytes);
+ // calculate the total size of data to be written
+ long blockletSize = blockletDataHolder.getSize() + metadataSize;
+ // to check if data size will exceed the block size then create a new file
+ createNewFileIfReachThreshold(blockletSize);
+
+ // write data to file
+ try {
+ if (fileChannel.size() == 0) {
+ // write the header if file is empty
+ writeHeaderToFile(fileChannel);
+ }
+ writeBlockletToFile(fileChannel, dataChunkBytes);
+ if (listener != null) {
+ listener.onBlockletEnd(blockletId++);
+ }
+ pageId = 0;
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem when writing file", e);
+ }
+ // clear the data holder
+ blockletDataHolder.clear();
+
+ }
+
+ /**
+ * Fill dataChunkBytes and return total size of page metadata
+ */
+ private long fillDataChunk(List<EncodedTablePage> encodedTablePageList, byte[][] dataChunkBytes) {
+ int size = 0;
+ int numDimensions = encodedTablePageList.get(0).getNumDimensions();
+ int numMeasures = encodedTablePageList.get(0).getNumMeasures();
int measureStartIndex = numDimensions;
// calculate the size of data chunks
try {
for (int i = 0; i < numDimensions; i++) {
dataChunkBytes[i] = CarbonUtil.getByteArray(
CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, i));
- blockletDataSize += dataChunkBytes[i].length;
+ size += dataChunkBytes[i].length;
}
for (int i = 0; i < numMeasures; i++) {
dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(
CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i));
- blockletDataSize += dataChunkBytes[measureStartIndex].length;
+ size += dataChunkBytes[measureStartIndex].length;
measureStartIndex++;
}
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while getting the data chunks", e);
}
- // calculate the total size of data to be written
- blockletDataSize += dataWriterHolder.getSize();
- // to check if data size will exceed the block size then create a new file
- updateBlockletFileChannel(blockletDataSize);
-
- // write data to file
- try {
- if (fileChannel.size() == 0) {
- // write the header if file is empty
- writeHeaderToFile(fileChannel);
- }
- writeBlockletToFile(fileChannel, dataChunkBytes);
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem when writing file", e);
- }
- // clear the data holder
- dataWriterHolder.clear();
+ return size;
}
/**
@@ -210,7 +244,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
List<Long> currentDataChunksOffset = new ArrayList<>();
// to maintain the length of each data chunk in blocklet
List<Integer> currentDataChunksLength = new ArrayList<>();
- List<EncodedTablePage> encodedTablePages = dataWriterHolder.getEncodedTablePages();
+ List<EncodedTablePage> encodedTablePages = blockletDataHolder.getEncodedTablePages();
int numberOfDimension = encodedTablePages.get(0).getNumDimensions();
int numberOfMeasures = encodedTablePages.get(0).getNumMeasures();
ByteBuffer buffer = null;
@@ -258,7 +292,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
encodedTablePages, dataWriterVo.getSegmentProperties().getMeasures()));
BlockletInfo3 blockletInfo3 =
new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength,
- dimensionOffset, measureOffset, dataWriterHolder.getEncodedTablePages().size());
+ dimensionOffset, measureOffset, blockletDataHolder.getEncodedTablePages().size());
blockletMetadata.add(blockletInfo3);
}
@@ -328,10 +362,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
* @throws CarbonDataWriterException
*/
public void closeWriter() throws CarbonDataWriterException {
- CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
- renameCarbonDataFile();
- copyCarbonDataFileToCarbonStorePath(
- this.carbonDataFileTempPath.substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+ commitCurrentFile(true);
try {
writeIndexFile();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
deleted file mode 100644
index 246fa86..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.writer.v3;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
-
-public class DataWriterHolder {
- private List<EncodedTablePage> encodedTablePage;
- private long currentSize;
-
- public DataWriterHolder() {
- this.encodedTablePage = new ArrayList<EncodedTablePage>();
- }
-
- public void clear() {
- encodedTablePage.clear();
- currentSize = 0;
- }
-
- public void addPage(EncodedTablePage encodedTablePage) {
- this.encodedTablePage.add(encodedTablePage);
- currentSize += encodedTablePage.getEncodedSize();
- }
-
- public long getSize() {
- // increasing it by 15 percent for data chunk 3 of each column each page
- return currentSize + ((currentSize * 15) / 100);
- }
-
- public int getNumberOfPagesAdded() {
- return encodedTablePage.size();
- }
-
- public int getTotalRows() {
- int rows = 0;
- for (EncodedTablePage nh : encodedTablePage) {
- rows += nh.getPageSize();
- }
- return rows;
- }
-
- public List<EncodedTablePage> getEncodedTablePages() {
- return encodedTablePage;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index b46a42c..f823ade 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datastore.DimensionType;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -50,6 +49,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
[2/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter
interface
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
deleted file mode 100644
index 5837f0c..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.testsuite.validation;
-
-import org.apache.spark.sql.common.util.CarbonHiveContext;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.reader.CarbonFooterReader;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.format.BlockletIndex;
-import org.apache.carbondata.format.BlockletInfo;
-import org.apache.carbondata.format.DataChunk;
-import org.apache.carbondata.format.Encoding;
-import org.apache.carbondata.format.FileFooter;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class FileFooterValidator {
-
- private static FileFooter fileFooter;
-
- private static boolean setUpIsDone;
-
- @Before public void setUp() throws Exception {
-
- if (setUpIsDone) {
- return;
- }
- CarbonHiveContext.sql(
- "CREATE CUBE validatefooter DIMENSIONS (empno Integer, empname String,"
- + " designation String,"
- + " doj Timestamp, workgroupcategory Integer, workgroupcategoryname String, "
- + "deptno Integer, deptname String, projectcode Integer, projectjoindate Timestamp,"
- + " projectenddate Timestamp) MEASURES (attendance Integer,utilization Integer,"
- + "salary Integer) OPTIONS (PARTITIONER [PARTITION_COUNT=1])");
- CarbonHiveContext.sql(
- "LOAD DATA fact from './src/test/resources/data.csv' INTO CUBE validatefooter "
- + "PARTITIONDATA(DELIMITER ',', QUOTECHAR '\"')");
- String storePath =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
- CarbonTableIdentifier tableIdentifier =
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "validatefooter", "1");
- String segmentPath = CarbonStorePath.getCarbonTablePath(storePath, tableIdentifier)
- .getCarbonDataDirectoryPath("0", "0");
- CarbonFile carbonFile =
- FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
- CarbonFile[] list = carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
- return true;
- }
- return false;
- }
- });
-
- for (CarbonFile file : list) {
- String fileLocation = file.getAbsolutePath();
- CarbonFile factFile =
- FileFactory.getCarbonFile(fileLocation, FileFactory.getFileType(fileLocation));
- long offset = factFile.getSize() - CarbonCommonConstants.LONG_SIZE_IN_BYTE;
- FileHolder fileHolder = FileFactory.getFileHolder(FileFactory.getFileType(fileLocation));
- offset = fileHolder.readLong(fileLocation, offset);
- CarbonFooterReader metaDataReader = new CarbonFooterReader(fileLocation, offset);
- fileFooter = metaDataReader.readFooter();
- }
- setUpIsDone = true;
- }
-
- @AfterClass public static void tearDownAfterClass() {
- CarbonHiveContext.sql("drop CUBE validatefooter");
- }
-
- @Test public void testFileFooterExist() {
- assertTrue(fileFooter != null);
- }
-
- @Test public void testFileFooterVersion() {
- assertTrue(fileFooter.getVersion() >= 0);
- }
-
- @Test public void testFileFooterNumRows() {
- assertTrue(fileFooter.getNum_rows() > 0);
- }
-
- @Test public void testFileFooterTableColumns() {
- assertTrue(fileFooter.getTable_columns() != null && fileFooter.getTable_columns().size() > 0);
- }
-
- @Test public void testFileFooterSegmentInfo() {
- assertTrue(
- fileFooter.getSegment_info() != null && fileFooter.getSegment_info().getNum_cols() > 0
- && fileFooter.getSegment_info().getColumn_cardinalities().size() > 0);
- }
-
- @Test public void testFileFooterBlockletIndex() {
- assertTrue(fileFooter.getBlocklet_index_list() != null
- && fileFooter.getBlocklet_index_list().size() > 0);
- for (BlockletIndex blockletIndex : fileFooter.getBlocklet_index_list()) {
- assertTrue(blockletIndex.getMin_max_index().getMin_values() != null
- && blockletIndex.getMin_max_index().getMin_values().size() > 0
- && blockletIndex.getMin_max_index().getMax_values() != null
- && blockletIndex.getMin_max_index().getMax_values().size() > 0
- && blockletIndex.getMin_max_index().getMin_values().size() == blockletIndex
- .getMin_max_index().getMax_values().size());
- assertTrue(blockletIndex.getB_tree_index().getStart_key() != null
- && blockletIndex.getB_tree_index().getEnd_key() != null);
- }
- }
-
- @Test public void testFileFooterBlockletInfo() {
- assertTrue(fileFooter.getBlocklet_info_list() != null
- && fileFooter.getBlocklet_info_list().size() > 0);
- for (BlockletInfo blockletInfo : fileFooter.getBlocklet_info_list()) {
- assertTrue(blockletInfo.getNum_rows() > 0 && blockletInfo.getColumn_data_chunks() != null
- && blockletInfo.getColumn_data_chunks().size() > 0);
- for (DataChunk columnDataChunk : blockletInfo.getColumn_data_chunks()) {
- testColumnDataChunk(columnDataChunk);
- }
- }
- }
-
- private void testColumnDataChunk(DataChunk columnDatachunk) {
- assertTrue(columnDatachunk.getEncoders() != null && columnDatachunk.getChunk_meta() != null
- && columnDatachunk.getChunk_meta().getCompression_codec() != null);
- // For Measure
- if (columnDatachunk.getEncoders().contains(Encoding.DELTA)) {
- assertTrue(
- columnDatachunk.getPresence() != null && columnDatachunk.getEncoder_meta() != null);
- } else {
- assertTrue(columnDatachunk.getSort_state() != null);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
new file mode 100644
index 0000000..b0e4833
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.events.ChangeEvent
+import org.apache.carbondata.core.indexstore.schema.FilterType
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.util.CarbonProperties
+
+class C2DataMapFactory() extends DataMapFactory {
+
+ override def init(identifier: AbsoluteTableIdentifier,
+ dataMapName: String): Unit = {}
+
+ override def fireEvent(event: ChangeEvent[_]): Unit = ???
+
+ override def clear(segmentId: String): Unit = ???
+
+ override def clear(): Unit = ???
+
+ override def getDataMap(distributable: DataMapDistributable): DataMap = ???
+
+ override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+
+ override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+
+ override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
+}
+
+class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+
+ def buildTestData(numRows: Int): DataFrame = {
+ import sqlContext.implicits._
+ sqlContext.sparkContext.parallelize(1 to numRows)
+ .map(x => ("a", "b", x))
+ .toDF("c1", "c2", "c3")
+ }
+
+ def dropTable(): Unit = {
+ sql("DROP TABLE IF EXISTS carbon1")
+ sql("DROP TABLE IF EXISTS carbon2")
+ }
+
+ override def beforeAll {
+ dropTable()
+ }
+
+ test("test write datamap 2 pages") {
+ // register datamap writer
+ DataMapStoreManager.getInstance().createAndRegisterDataMap(
+ AbsoluteTableIdentifier.from(storeLocation, "default", "carbon1"),
+ classOf[C2DataMapFactory],
+ "test")
+
+ val df = buildTestData(33000)
+
+ // save dataframe to carbon file
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon1")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+ assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+ assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+ "blocklet start 0",
+ "add page data: blocklet 0, page 0",
+ "add page data: blocklet 0, page 1",
+ "blocklet end: 0"
+ ))
+ DataMapWriterSuite.callbackSeq = Seq()
+ }
+
+ test("test write datamap 2 blocklet") {
+ // register datamap writer
+ DataMapStoreManager.getInstance().createAndRegisterDataMap(
+ AbsoluteTableIdentifier.from(storeLocation, "default", "carbon2"),
+ classOf[C2DataMapFactory],
+ "test")
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.blockletgroup.size.in.mb", "1")
+
+ val df = buildTestData(300000)
+
+ // save dataframe to carbon file
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon2")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+ assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+ assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+ "blocklet start 0",
+ "add page data: blocklet 0, page 0",
+ "add page data: blocklet 0, page 1",
+ "add page data: blocklet 0, page 2",
+ "add page data: blocklet 0, page 3",
+ "add page data: blocklet 0, page 4",
+ "add page data: blocklet 0, page 5",
+ "add page data: blocklet 0, page 6",
+ "add page data: blocklet 0, page 7",
+ "blocklet end: 0",
+ "blocklet start 1",
+ "add page data: blocklet 1, page 0",
+ "add page data: blocklet 1, page 1",
+ "blocklet end: 1"
+ ))
+ DataMapWriterSuite.callbackSeq = Seq()
+ }
+
+ override def afterAll {
+ dropTable()
+ }
+}
+
+object DataMapWriterSuite {
+ var callbackSeq: Seq[String] = Seq[String]()
+
+ val dataMapWriterC2Mock = new DataMapWriter {
+
+ override def onPageAdded(
+ blockletId: Int,
+ pageId: Int,
+ pages: Array[ColumnPage]): Unit = {
+ assert(pages.length == 1)
+ assert(pages(0).getDataType == DataType.BYTE_ARRAY)
+ val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+ assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+ callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+ }
+
+ override def onBlockletEnd(blockletId: Int): Unit = {
+ callbackSeq :+= s"blocklet end: $blockletId"
+ }
+
+ override def onBlockEnd(blockId: String): Unit = {
+ callbackSeq :+= s"block end $blockId"
+ }
+
+ override def onBlockletStart(blockletId: Int): Unit = {
+ callbackSeq :+= s"blocklet start $blockletId"
+ }
+
+ override def onBlockStart(blockId: String): Unit = {
+ callbackSeq :+= s"block start $blockId"
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index e0829ed..a6a8835 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -256,10 +256,7 @@ object DataManagementFunc {
compactionModel.compactionType
)
- val future: Future[Void] = executor
- .submit(new CompactionCallable(compactionCallableModel
- )
- )
+ val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel))
futureList.add(future)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index a32146a..90f57a9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -31,9 +31,9 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 4620db0..1837c04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
index e92d06d..697b727 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
@@ -43,6 +43,7 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
sql("""select c1 from iud_db_sub.dest"""),
Seq(Row("c"), Row("d"), Row("e"))
)
+ sql("drop table if exists iud_db_sub.dest")
}
test("delete data from carbon table[where IN (sub query with where clause) ]") {
@@ -54,10 +55,12 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
sql("""select c1 from iud_db_sub.dest"""),
Seq(Row("a"), Row("c"), Row("d"), Row("e"))
)
+ sql("drop table if exists iud_db_sub.dest")
}
override def afterAll {
- sql("use default")
+ sql("drop table if exists iud_db_sub.source2")
sql("drop database if exists iud_db_sub cascade")
+ sql("use default")
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
deleted file mode 100644
index 12fe27b..0000000
--- a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-
-/**
- * Generic DataType interface which will be used while data loading for complex types like Array &
- * Struct
- */
-public interface GenericDataType<T> {
-
- /**
- * @return name of the column
- */
- String getName();
-
- /**
- * @return - columns parent name
- */
- String getParentname();
-
- /**
- * @param children - To add children dimension for parent complex type
- */
- void addChildren(GenericDataType children);
-
- /**
- * @param primitiveChild - Returns all primitive type columns in complex type
- */
- void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
-
- /**
- * writes to byte stream
- * @param dataOutputStream
- * @throws IOException
- */
- void writeByteArray(T input, DataOutputStream dataOutputStream)
- throws IOException, DictionaryGenerationException;
-
- /**
- * @return surrogateIndex for primitive column in complex type
- */
- int getSurrogateIndex();
-
- /**
- * @param surrIndex - surrogate index of primitive column in complex type
- */
- void setSurrogateIndex(int surrIndex);
-
- /**
- * converts integer surrogate to bit packed surrogate value
- * @param byteArrayInput
- * @param dataOutputStream
- * @param generator
- * @throws IOException
- * @throws KeyGenException
- */
- void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
- KeyGenerator[] generator) throws IOException, KeyGenException;
-
- /**
- * @return columns count of each complex type
- */
- int getColsCount();
-
- /**
- * @return column uuid string
- */
- String getColumnId();
-
- /**
- * set array index to be referred while creating metadata column
- * @param outputArrayIndex
- */
- void setOutputArrayIndex(int outputArrayIndex);
-
- /**
- * @return array index count of metadata column
- */
- int getMaxOutputArrayIndex();
-
- /**
- * Split byte array into complex metadata column and primitive column
- * @param columnsArray
- * @param inputArray
- */
- void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
-
- /**
- * @return current read row count
- */
- int getDataCounter();
-
- /**
- * fill agg key block including complex types
- * @param aggKeyBlockWithComplex
- * @param aggKeyBlock
- */
- void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
-
- /**
- * fill block key size including complex types
- * @param blockKeySizeWithComplex
- * @param primitiveBlockKeySize
- */
- void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
-
- /**
- * fill cardinality value including complex types
- * @param dimCardWithComplex
- * @param maxSurrogateKeyArray
- */
- void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
-
- /**
- * Fill the cardinality of the primitive datatypes
- * @param dimCardWithComplex
- */
- void fillCardinality(List<Integer> dimCardWithComplex);
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
new file mode 100644
index 0000000..4b0113c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.processing.store.TablePage;
+
+/**
+ * It is for writing DataMap for one table
+ */
+public class DataMapWriterListener {
+
+ private static final LogService LOG = LogServiceFactory.getLogService(
+ DataMapWriterListener.class.getCanonicalName());
+
+ // list indexed column name -> list of data map writer
+ private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
+
+ /**
+ * register all datamap writer for specified table and segment
+ */
+ public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+ List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+ if (tableDataMaps != null) {
+ for (TableDataMap tableDataMap : tableDataMaps) {
+ DataMapFactory factory = tableDataMap.getDataMapFactory();
+ register(factory, segmentId);
+ }
+ }
+ }
+
+ /**
+ * Register a DataMapWriter
+ */
+ private void register(DataMapFactory factory, String segmentId) {
+ assert (factory != null);
+ assert (segmentId != null);
+ DataMapMeta meta = factory.getMeta();
+ if (meta == null) {
+ // if data map does not have meta, no need to register
+ return;
+ }
+ List<String> columns = factory.getMeta().getIndexedColumns();
+ List<DataMapWriter> writers = registry.get(columns);
+ DataMapWriter writer = factory.createWriter(segmentId);
+ if (writers != null) {
+ writers.add(writer);
+ } else {
+ writers = new ArrayList<>();
+ writers.add(writer);
+ registry.put(columns, writers);
+ }
+ LOG.info("DataMapWriter " + writer + " added");
+ }
+
+ public void onBlockStart(String blockId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockStart(blockId);
+ }
+ }
+ }
+
+ public void onBlockEnd(String blockId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockEnd(blockId);
+ }
+ }
+ }
+
+ public void onBlockletStart(int blockletId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockletStart(blockletId);
+ }
+ }
+ }
+
+ public void onBlockletEnd(int blockletId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockletEnd(blockletId);
+ }
+ }
+ }
+
+ /**
+ * Pick corresponding column pages and add to all registered datamap
+ *
+ * @param pageId sequence number of page, start from 0
+ * @param tablePage page data
+ */
+ public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
+ Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
+ for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
+ List<String> indexedColumns = entry.getKey();
+ ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
+ for (int i = 0; i < indexedColumns.size(); i++) {
+ pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
+ }
+ List<DataMapWriter> writers = entry.getValue();
+ for (DataMapWriter writer : writers) {
+ writer.onPageAdded(blockletId, pageId, pages);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index f5fdd4d..02ceb06 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
new file mode 100644
index 0000000..6b54d2d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datatypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+
+/**
+ * Generic DataType interface which will be used while data loading for complex types like Array &
+ * Struct
+ */
+public interface GenericDataType<T> {
+
+ /**
+ * @return name of the column
+ */
+ String getName();
+
+ /**
+ * @return - columns parent name
+ */
+ String getParentname();
+
+ /**
+ * @param children - To add children dimension for parent complex type
+ */
+ void addChildren(GenericDataType children);
+
+ /**
+ * @param primitiveChild - Returns all primitive type columns in complex type
+ */
+ void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
+
+ /**
+ * writes to byte stream
+ * @param dataOutputStream
+ * @throws IOException
+ */
+ void writeByteArray(T input, DataOutputStream dataOutputStream)
+ throws IOException, DictionaryGenerationException;
+
+ /**
+ * @return surrogateIndex for primitive column in complex type
+ */
+ int getSurrogateIndex();
+
+ /**
+ * @param surrIndex - surrogate index of primitive column in complex type
+ */
+ void setSurrogateIndex(int surrIndex);
+
+ /**
+ * converts integer surrogate to bit packed surrogate value
+ * @param byteArrayInput
+ * @param dataOutputStream
+ * @param generator
+ * @throws IOException
+ * @throws KeyGenException
+ */
+ void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+ KeyGenerator[] generator) throws IOException, KeyGenException;
+
+ /**
+ * @return columns count of each complex type
+ */
+ int getColsCount();
+
+ /**
+ * @return column uuid string
+ */
+ String getColumnId();
+
+ /**
+ * set array index to be referred while creating metadata column
+ * @param outputArrayIndex
+ */
+ void setOutputArrayIndex(int outputArrayIndex);
+
+ /**
+ * @return array index count of metadata column
+ */
+ int getMaxOutputArrayIndex();
+
+ /**
+ * Split byte array into complex metadata column and primitive column
+ * @param columnsArray
+ * @param inputArray
+ */
+ void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
+
+ /**
+ * @return current read row count
+ */
+ int getDataCounter();
+
+ /**
+ * fill agg key block including complex types
+ * @param aggKeyBlockWithComplex
+ * @param aggKeyBlock
+ */
+ void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
+
+ /**
+ * fill block key size including complex types
+ * @param blockKeySizeWithComplex
+ * @param primitiveBlockKeySize
+ */
+ void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
+
+ /**
+ * fill cardinality value including complex types
+ * @param dimCardWithComplex
+ * @param maxSurrogateKeyArray
+ */
+ void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
+
+ /**
+ * Fill the cardinality of the primitive datatypes
+ * @param dimCardWithComplex
+ */
+ void fillCardinality(List<Integer> dimCardWithComplex);
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index a24a324..e7e48e9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 94ee9f6..a61144e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
index d5730a2..8feea6a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
@@ -21,8 +21,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index d30582b..e9b0a78 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -23,13 +23,13 @@ import java.util.Map;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.newflow.DataField;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 9c48af7..a716340 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -35,10 +35,8 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -51,6 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
import org.apache.carbondata.processing.store.file.FileManager;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -74,6 +73,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* data writer
*/
private CarbonFactDataWriter dataWriter;
+
/**
* File manager
*/
@@ -87,11 +87,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
* once this size of input is reached
*/
- private int blockletSize;
- /**
- * keyGenerator
- */
- private ColumnarSplitter columnarSplitter;
+ private int pageSize;
/**
* keyBlockHolder
*/
@@ -120,7 +116,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* a private class that will hold the data for blocklets
*/
- private BlockletDataHolder blockletDataHolder;
+ private TablePageList tablePageList;
/**
* number of cores configured
*/
@@ -146,8 +142,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private ColumnarFormatVersion version;
- private SortScopeOptions.SortScope sortScope;
-
/**
* CarbonFactDataHandler constructor
*/
@@ -202,11 +196,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
noInvertedIdxCol += (cd.getColName() + ",");
}
}
+
LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol);
}
private void initParameters(CarbonFactDataHandlerModel model) {
- this.sortScope = model.getSortScope();
+ SortScopeOptions.SortScope sortScope = model.getSortScope();
this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
//TODO need to pass carbon table identifier to metadata
@@ -254,10 +249,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
consumerExecutorService = Executors.newFixedThreadPool(1);
consumerExecutorServiceTaskList = new ArrayList<>(1);
semaphore = new Semaphore(numberOfCores);
- blockletDataHolder = new BlockletDataHolder();
+ tablePageList = new TablePageList();
// Start the consumer which will take each blocklet/page in order and write to a file
- Consumer consumer = new Consumer(blockletDataHolder);
+ Consumer consumer = new Consumer(tablePageList);
consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
}
@@ -314,20 +309,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.entryCount++;
// if entry count reaches to leaf node size then we are ready to write
// this to leaf node file and update the intermediate files
- if (this.entryCount == this.blockletSize) {
+ if (this.entryCount == this.pageSize) {
try {
semaphore.acquire();
producerExecutorServiceTaskList.add(
producerExecutorService.submit(
- new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
+ new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, false)
)
);
blockletProcessingCount.incrementAndGet();
// set the entry count to zero
processedDataCount += entryCount;
LOGGER.info("Total Number Of records added to store: " + processedDataCount);
- dataRows = new ArrayList<>(this.blockletSize);
+ dataRows = new ArrayList<>(this.pageSize);
this.entryCount = 0;
} catch (InterruptedException e) {
LOGGER.error(e, e.getMessage());
@@ -339,10 +334,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* generate the EncodedTablePage from the input rows (one page in case of V3 format)
*/
- private EncodedTablePage processDataRows(List<CarbonRow> dataRows)
+ private TablePage processDataRows(List<CarbonRow> dataRows)
throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
if (dataRows.size() == 0) {
- return EncodedTablePage.newEmptyInstance();
+ return new TablePage(model, 0);
}
TablePage tablePage = new TablePage(model, dataRows.size());
int rowId = 0;
@@ -352,11 +347,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
tablePage.addRow(rowId++, row);
}
- EncodedTablePage encoded = tablePage.encode();
- tablePage.freeMemory();
+ tablePage.encode();
LOGGER.info("Number Of records processed: " + dataRows.size());
- return encoded;
+ return tablePage;
}
/**
@@ -370,7 +364,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
try {
semaphore.acquire();
producerExecutorServiceTaskList.add(producerExecutorService
- .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
+ .submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true)));
blockletProcessingCount.incrementAndGet();
processedDataCount += entryCount;
closeWriterExecutionService(producerExecutorService);
@@ -471,19 +465,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private void setWritingConfiguration() throws CarbonDataWriterException {
// get blocklet size
- this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+ this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
if (version == ColumnarFormatVersion.V3) {
- this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+ this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
}
- LOGGER.info("Number of rows per column blocklet " + blockletSize);
- dataRows = new ArrayList<>(this.blockletSize);
+ LOGGER.info("Number of rows per column blocklet " + pageSize);
+ dataRows = new ArrayList<>(this.pageSize);
int dimSet =
Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
- // if atleast one dimension is present then initialize column splitter otherwise null
+ // if at least one dimension is present then initialize column splitter otherwise null
int noOfColStore = colGrpModel.getNoOfColumnStore();
int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()];
@@ -494,16 +488,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
//row store will be in single column store
//e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension
//than below splitter will return column as {0,1,2}{3}{4}{5}
- this.columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
+ ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
this.keyBlockHolder =
- new CarbonKeyBlockHolder[this.columnarSplitter.getBlockKeySize().length];
+ new CarbonKeyBlockHolder[columnarSplitter.getBlockKeySize().length];
} else {
this.keyBlockHolder = new CarbonKeyBlockHolder[0];
}
for (int i = 0; i < keyBlockHolder.length; i++) {
- this.keyBlockHolder[i] = new CarbonKeyBlockHolder(blockletSize);
+ this.keyBlockHolder[i] = new CarbonKeyBlockHolder(pageSize);
this.keyBlockHolder[i].resetCounter();
}
@@ -535,7 +529,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
.getBlockKeySize());
System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
blockKeySize.length - noOfColStore);
- this.dataWriter = getFactDataWriter(keyBlockSize);
+ this.dataWriter = getFactDataWriter();
this.dataWriter.setIsNoDictionary(isNoDictionary);
// initialize the channel;
this.dataWriter.initializeWriter();
@@ -574,21 +568,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* Below method will be used to get the fact data writer instance
*
- * @param keyBlockSize
* @return data writer instance
*/
- private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
+ private CarbonFactDataWriter<?> getFactDataWriter() {
return CarbonDataWriterFactory.getInstance()
- .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
+ .getFactDataWriter(version, getDataWriterVo());
}
/**
* Below method will be used to get the writer vo
*
- * @param keyBlockSize size of each key block
* @return data writer vo object
*/
- private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) {
+ private CarbonDataWriterVo getDataWriterVo() {
CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo();
carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
@@ -608,6 +600,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
carbonDataWriterVo.setBucketNumber(model.getBucketId());
carbonDataWriterVo.setTaskExtension(model.getTaskExtension());
carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
+ carbonDataWriterVo.setListener(model.getDataMapWriterlistener());
return carbonDataWriterVo;
}
@@ -644,14 +637,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
/**
- * This class will hold the holder objects and manage producer and consumer for reading
- * and writing the blocklet data
+ * This class will hold the table page data
*/
- private final class BlockletDataHolder {
+ private final class TablePageList {
/**
- * array of blocklet data holder objects
+ * array of table page added by Producer and get by Consumer
*/
- private EncodedTablePage[] encodedTablePages;
+ private TablePage[] tablePages;
/**
* flag to check whether the producer has completed processing for holder
* object which is required to be picked form an index
@@ -662,8 +654,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private int currentIndex;
- private BlockletDataHolder() {
- encodedTablePages = new EncodedTablePage[numberOfCores];
+ private TablePageList() {
+ tablePages = new TablePage[numberOfCores];
available = new AtomicBoolean(false);
}
@@ -671,32 +663,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @return a node holder object
* @throws InterruptedException if consumer thread is interrupted
*/
- public synchronized EncodedTablePage get() throws InterruptedException {
- EncodedTablePage encodedTablePage = encodedTablePages[currentIndex];
+ public synchronized TablePage get() throws InterruptedException {
+ TablePage tablePage = tablePages[currentIndex];
// if node holder is null means producer thread processing the data which has to
// be inserted at this current index has not completed yet
- if (null == encodedTablePage && !processingComplete) {
+ if (null == tablePage && !processingComplete) {
available.set(false);
}
while (!available.get()) {
wait();
}
- encodedTablePage = encodedTablePages[currentIndex];
- encodedTablePages[currentIndex] = null;
+ tablePage = tablePages[currentIndex];
+ tablePages[currentIndex] = null;
currentIndex++;
// reset current index when it reaches length of node holder array
- if (currentIndex >= encodedTablePages.length) {
+ if (currentIndex >= tablePages.length) {
currentIndex = 0;
}
- return encodedTablePage;
+ return tablePage;
}
/**
* @param encodedTablePage
* @param index
*/
- public synchronized void put(EncodedTablePage encodedTablePage, int index) {
- encodedTablePages[index] = encodedTablePage;
+ public synchronized void put(TablePage tablePage, int index) {
+ tablePages[index] = tablePage;
// notify the consumer thread when index at which object is to be inserted
// becomes equal to current index from where data has to be picked for writing
if (index == currentIndex) {
@@ -711,16 +703,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private final class Producer implements Callable<Void> {
- private BlockletDataHolder blockletDataHolder;
+ private TablePageList tablePageList;
private List<CarbonRow> dataRows;
- private int sequenceNumber;
+ private int pageId;
private boolean isLastPage;
- private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
- int sequenceNumber, boolean isLastPage) {
- this.blockletDataHolder = blockletDataHolder;
+ private Producer(TablePageList tablePageList, List<CarbonRow> dataRows,
+ int pageId, boolean isLastPage) {
+ this.tablePageList = tablePageList;
this.dataRows = dataRows;
- this.sequenceNumber = sequenceNumber;
+ this.pageId = pageId;
this.isLastPage = isLastPage;
}
@@ -732,11 +724,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
@Override public Void call() throws Exception {
try {
- EncodedTablePage encodedTablePage = processDataRows(dataRows);
- encodedTablePage.setIsLastPage(isLastPage);
+ TablePage tablePage = processDataRows(dataRows);
+ tablePage.setIsLastPage(isLastPage);
// insert the object in array according to sequence number
- int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
- blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray);
+ int indexInNodeHolderArray = (pageId - 1) % numberOfCores;
+ tablePageList.put(tablePage, indexInNodeHolderArray);
return null;
} catch (Throwable throwable) {
LOGGER.error(throwable, "Error in producer");
@@ -752,10 +744,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private final class Consumer implements Callable<Void> {
- private BlockletDataHolder blockletDataHolder;
+ private TablePageList tablePageList;
- private Consumer(BlockletDataHolder blockletDataHolder) {
- this.blockletDataHolder = blockletDataHolder;
+ private Consumer(TablePageList tablePageList) {
+ this.tablePageList = tablePageList;
}
/**
@@ -766,11 +758,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
@Override public Void call() throws Exception {
while (!processingComplete || blockletProcessingCount.get() > 0) {
- EncodedTablePage encodedTablePage = null;
+ TablePage tablePage = null;
try {
- encodedTablePage = blockletDataHolder.get();
- if (null != encodedTablePage) {
- dataWriter.writeTablePage(encodedTablePage);
+ tablePage = tablePageList.get();
+ if (null != tablePage) {
+ dataWriter.writeTablePage(tablePage);
+ tablePage.freeMemory();
}
blockletProcessingCount.decrementAndGet();
} catch (Throwable throwable) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 51ec84b..c059030 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -39,6 +38,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
@@ -159,6 +160,8 @@ public class CarbonFactDataHandlerModel {
private SortScopeOptions.SortScope sortScope;
+ private DataMapWriterListener dataMapWriterlistener;
+
/**
* Create the model using @{@link CarbonDataLoadConfiguration}
*/
@@ -254,6 +257,11 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.taskExtension = taskExtension;
carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+
+ DataMapWriterListener listener = new DataMapWriterListener();
+ listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId());
+ carbonFactDataHandlerModel.dataMapWriterlistener = listener;
+
return carbonFactDataHandlerModel;
}
@@ -557,5 +565,9 @@ public class CarbonFactDataHandlerModel {
public SortScopeOptions.SortScope getSortScope() {
return sortScope;
}
+
+ public DataMapWriterListener getDataMapWriterlistener() {
+ return dataMapWriterlistener;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 03f3e5e..d2363f1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.DimensionType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -44,9 +44,8 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.spark.sql.types.Decimal;
@@ -73,7 +72,12 @@ public class TablePage {
private TablePageKey key;
- private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+ private EncodedTablePage encodedTablePage;
+
+ private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+ // true if it is last page of all input rows
+ private boolean isLastPage;
TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
this.model = model;
@@ -240,14 +244,16 @@ public class TablePage {
return output;
}
- EncodedTablePage encode() throws KeyGenException, MemoryException, IOException {
+ void encode() throws KeyGenException, MemoryException, IOException {
// encode dimensions and measure
EncodedDimensionPage[] dimensions = encodeAndCompressDimensions();
EncodedMeasurePage[] measures = encodeAndCompressMeasures();
- return EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
+ this.encodedTablePage = EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
}
- private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+ public EncodedTablePage getEncodedTablePage() {
+ return encodedTablePage;
+ }
// apply measure and set encodedData in `encodedData`
private EncodedMeasurePage[] encodeAndCompressMeasures()
@@ -301,6 +307,52 @@ public class TablePage {
encodedDimensions.addAll(encodedComplexDimenions);
return encodedDimensions.toArray(new EncodedDimensionPage[encodedDimensions.size()]);
}
+
+ /**
+ * return column page of specified column name
+ */
+ public ColumnPage getColumnPage(String columnName) {
+ int dictDimensionIndex = -1;
+ int noDictDimensionIndex = -1;
+ ColumnPage page = null;
+ TableSpec spec = model.getTableSpec();
+ int numDimensions = spec.getNumDimensions();
+ for (int i = 0; i < numDimensions; i++) {
+ DimensionType type = spec.getDimensionSpec(i).getDimensionType();
+ if ((type == DimensionType.GLOBAL_DICTIONARY) || (type == DimensionType.DIRECT_DICTIONARY)) {
+ page = dictDimensionPages[++dictDimensionIndex];
+ } else if (type == DimensionType.PLAIN_VALUE) {
+ page = noDictDimensionPages[++noDictDimensionIndex];
+ } else {
+ // do not support datamap on complex column
+ continue;
+ }
+ String fieldName = spec.getDimensionSpec(i).getFieldName();
+ if (fieldName.equalsIgnoreCase(columnName)) {
+ return page;
+ }
+ }
+ int numMeasures = spec.getNumMeasures();
+ for (int i = 0; i < numMeasures; i++) {
+ String fieldName = spec.getMeasureSpec(i).getFieldName();
+ if (fieldName.equalsIgnoreCase(columnName)) {
+ return measurePage[i];
+ }
+ }
+ throw new IllegalArgumentException("DataMap: must have '" + columnName + "' column in schema");
+ }
+
+ public boolean isLastPage() {
+ return isLastPage;
+ }
+
+ public void setIsLastPage(boolean isWriteAll) {
+ this.isLastPage = isWriteAll;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a34ed01..bcc0112 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -24,7 +24,6 @@ import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-//import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
@@ -44,17 +43,14 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonMergerUtil;
@@ -66,6 +62,7 @@ import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.file.FileData;
import org.apache.commons.lang3.ArrayUtils;
@@ -97,11 +94,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
protected String carbonDataFileTempPath;
/**
- * The name of carbonData file
+ * The name of carbonData file (blockId)
*/
protected String carbonDataFileName;
/**
+ * The sequence number of blocklet inside one block
+ */
+ protected int blockletId = 0;
+
+ /**
+ * The sequence number of page inside one blocklet
+ */
+ protected int pageId = 0;
+
+ /**
* Local cardinality for the segment
*/
protected int[] localCardinality;
@@ -132,7 +139,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
/**
* data block size for one carbon data file
*/
- private long dataBlockSize;
+ private long blockSizeThreshold;
/**
* file size at any given point
*/
@@ -152,6 +159,11 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
+ /**
+ * listener to write data map
+ */
+ protected DataMapWriterListener listener;
+
public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
this.dataWriterVo = dataWriterVo;
this.blockletInfoList =
@@ -163,22 +175,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.fileSizeInBytes =
(long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
* CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
- /*
- size reserved in one file for writing block meta data. It will be in percentage
- */
+
+ // size reserved in one file for writing block meta data. It will be in percentage
int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
.getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
- this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
- LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + dataBlockSize);
+ this.blockSizeThreshold =
+ fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
+ LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
+ blockSizeThreshold);
this.executorService = Executors.newFixedThreadPool(1);
executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// in case of compaction we will pass the cardinality.
this.localCardinality = dataWriterVo.getColCardinality();
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo
- .getTableName());
+
//TODO: We should delete the levelmetadata file after reading here.
// so only data loading flow will need to read from cardinality file.
if (null == this.localCardinality) {
@@ -202,6 +213,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.dataChunksLength = new ArrayList<>();
blockletMetadata = new ArrayList<BlockletInfo3>();
blockletIndex = new ArrayList<>();
+ listener = dataWriterVo.getListener();
}
/**
@@ -241,18 +253,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
}
/**
- * This method will be used to update the file channel with new file; new
- * file will be created once existing file reached the file size limit This
+ * This method will be used to update the file channel with new file if exceeding block size
+ * threshold, new file will be created once existing file reached the file size limit This
* method will first check whether existing file size is exceeded the file
* size limit if yes then write the leaf metadata to file then set the
* current file size to 0 close the existing file channel get the new file
* name and get the channel for new file
*
- * @param blockletDataSize data size of one block
+ * @param blockletSizeToBeAdded data size of one block
* @throws CarbonDataWriterException if any problem
*/
- protected void updateBlockletFileChannel(long blockletDataSize) throws CarbonDataWriterException {
- if ((currentFileSize + blockletDataSize) >= dataBlockSize && currentFileSize != 0) {
+ protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
+ throws CarbonDataWriterException {
+ if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
// set the current file size to zero
LOGGER.info("Writing data to file as max file size reached for file: "
+ carbonDataFileTempPath + " .Data block size: " + currentFileSize);
@@ -265,16 +278,42 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.dataChunksLength = new ArrayList<>();
this.blockletMetadata = new ArrayList<>();
this.blockletIndex = new ArrayList<>();
- CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
- // rename carbon data file from in progress status to actual
- renameCarbonDataFile();
- executorServiceSubmitList.add(executorService
- .submit(new CopyThread(this.carbonDataFileTempPath
- .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')))));
+ commitCurrentFile(false);
// initialize the new channel
initializeWriter();
}
- currentFileSize += blockletDataSize;
+ currentFileSize += blockletSizeToBeAdded;
+ }
+
+ private void notifyDataMapBlockStart() {
+ if (listener != null) {
+ listener.onBlockStart(carbonDataFileName);
+ }
+ }
+
+ private void notifyDataMapBlockEnd() {
+ if (listener != null) {
+ listener.onBlockEnd(carbonDataFileName);
+ }
+ blockletId = 0;
+ }
+
+ /**
+ * Finish writing current file. It will flush stream, copy and rename temp file to final file
+ * @param copyInCurrentThread set to false if want to do data copy in a new thread
+ */
+ protected void commitCurrentFile(boolean copyInCurrentThread) {
+ notifyDataMapBlockEnd();
+ CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+ // rename carbon data file from in progress status to actual
+ renameCarbonDataFile();
+ String fileName = this.carbonDataFileTempPath.substring(0,
+ this.carbonDataFileTempPath.lastIndexOf('.'));
+ if (copyInCurrentThread) {
+ copyCarbonDataFileToCarbonStorePath(fileName);
+ } else {
+ executorServiceSubmitList.add(executorService.submit(new CopyThread(fileName)));
+ }
}
/**
@@ -310,6 +349,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
throw new CarbonDataWriterException("Problem while getting the FileChannel for Leaf File",
fileNotFoundException);
}
+ notifyDataMapBlockStart();
}
private int initFileCount() {
@@ -433,18 +473,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @throws CarbonDataWriterException
*/
public void closeWriter() throws CarbonDataWriterException {
- CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
if (this.blockletInfoList.size() > 0) {
- renameCarbonDataFile();
- copyCarbonDataFileToCarbonStorePath(
- this.carbonDataFileTempPath
- .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+ commitCurrentFile(true);
try {
writeIndexFile();
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while writing the index file", e);
}
}
+ CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
closeExecutorService();
}
@@ -590,17 +627,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
}
/**
- * This method will be used to write leaf data to file
- * file format
- * <key><measure1><measure2>....
- *
- * @throws CarbonDataWriterException
- * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
- */
- public abstract void writeTablePage(EncodedTablePage encodedTablePage)
- throws CarbonDataWriterException;
-
- /**
* Below method will be used to update the min or max value
* by removing the length from it
*
@@ -608,10 +634,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
return valueWithLength;
-// ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
-// byte[] actualValue = new byte[buffer.getShort()];
-// buffer.get(actualValue);
-// return actualValue;
}
/**
@@ -640,5 +662,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
copyCarbonDataFileToCarbonStorePath(fileName);
return null;
}
+
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 225e031..26fff09 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -20,6 +20,7 @@ import java.util.List;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -64,6 +65,8 @@ public class CarbonDataWriterVo {
private int taskExtension;
+ private DataMapWriterListener listener;
+
/**
* @return the storeLocation
*/
@@ -303,4 +306,12 @@ public class CarbonDataWriterVo {
public void setTaskExtension(int taskExtension) {
this.taskExtension = taskExtension;
}
+
+ public void setListener(DataMapWriterListener listener) {
+ this.listener = listener;
+ }
+
+ public DataMapWriterListener getListener() {
+ return listener;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index f194f74..3b26b7c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -18,14 +18,15 @@
package org.apache.carbondata.processing.store.writer;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
public interface CarbonFactDataWriter<T> {
/**
* write a encoded table page
+ * @param tablePage
*/
- void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException;
+ void writeTablePage(TablePage tablePage) throws CarbonDataWriterException;
/**
* Below method will be used to write the leaf meta data to file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index 0f1b52b..f849e21 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.writer.CarbonFooterWriter;
import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -199,14 +200,14 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
return holder;
}
- @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+ @Override public void writeTablePage(TablePage tablePage)
throws CarbonDataWriterException {
- if (encodedTablePage.getPageSize() == 0) {
+ if (tablePage.getPageSize() == 0) {
return;
}
- long blockletDataSize = encodedTablePage.getEncodedSize();
- updateBlockletFileChannel(blockletDataSize);
- NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
+ long blockletDataSize = tablePage.getEncodedTablePage().getEncodedSize();
+ createNewFileIfReachThreshold(blockletDataSize);
+ NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
// write data to file and get its offset
long offset = writeDataToFile(nodeHolder, fileChannel);
// get the blocklet info for currently added blocklet
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index e19a5ce..3f49a7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.writer.CarbonFooterWriter;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
@@ -63,19 +64,19 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
/**
* Below method will be used to write the data to carbon data file
*
- * @param encodedTablePage
+ * @param tablePage
* @throws CarbonDataWriterException any problem in writing operation
*/
- @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+ @Override public void writeTablePage(TablePage tablePage)
throws CarbonDataWriterException {
- NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
- if (encodedTablePage.getPageSize() == 0) {
+ NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
+ if (tablePage.getPageSize() == 0) {
return;
}
// size to calculate the size of the blocklet
int size = 0;
// get the blocklet info object
- BlockletInfoColumnar blockletInfo = getBlockletInfo(encodedTablePage, 0);
+ BlockletInfoColumnar blockletInfo = getBlockletInfo(tablePage.getEncodedTablePage(), 0);
List<DataChunk2> datachunks = null;
try {
@@ -105,7 +106,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + size;
// if size of the file already reached threshold size then create a new file and get the file
// channel object
- updateBlockletFileChannel(blockletDataSize);
+ createNewFileIfReachThreshold(blockletDataSize);
// writer the version header in the file if current file size is zero
// this is done so carbondata file can be read separately
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
new file mode 100644
index 0000000..68aee95
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
+
+public class BlockletDataHolder {
+ private List<EncodedTablePage> encodedTablePage;
+ private List<TablePage> rawTablePages;
+ private long currentSize;
+
+ public BlockletDataHolder() {
+ this.encodedTablePage = new ArrayList<>();
+ this.rawTablePages = new ArrayList<>();
+ }
+
+ public void clear() {
+ encodedTablePage.clear();
+ rawTablePages.clear();
+ currentSize = 0;
+ }
+
+ public void addPage(TablePage rawTablePage) {
+ EncodedTablePage encodedTablePage = rawTablePage.getEncodedTablePage();
+ this.encodedTablePage.add(encodedTablePage);
+ this.rawTablePages.add(rawTablePage);
+ currentSize += encodedTablePage.getEncodedSize();
+ }
+
+ public long getSize() {
+ // increasing it by 15 percent for data chunk 3 of each column each page
+ return currentSize + ((currentSize * 15) / 100);
+ }
+
+ public int getNumberOfPagesAdded() {
+ return encodedTablePage.size();
+ }
+
+ public int getTotalRows() {
+ int rows = 0;
+ for (EncodedTablePage nh : encodedTablePage) {
+ rows += nh.getPageSize();
+ }
+ return rows;
+ }
+
+ public List<EncodedTablePage> getEncodedTablePages() {
+ return encodedTablePage;
+ }
+
+ public List<TablePage> getRawTablePages() {
+ return rawTablePages;
+ }
+}
[3/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter
interface
Posted by ra...@apache.org.
[CARBONDATA-1363] Add DataMapWriter interface
This closes #1238
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f089287c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f089287c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f089287c
Branch: refs/heads/master
Commit: f089287cef1d685b81e8fa26868325503acdb635
Parents: 85cbad2
Author: Jacky Li <ja...@qq.com>
Authored: Thu Aug 10 13:36:18 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Aug 14 01:30:40 2017 +0530
----------------------------------------------------------------------
.../core/datamap/DataMapDistributable.java | 56 +++
.../carbondata/core/datamap/DataMapMeta.java | 42 ++
.../core/datamap/DataMapStoreManager.java | 144 +++++++
.../carbondata/core/datamap/TableDataMap.java | 142 +++++++
.../carbondata/core/datamap/dev/DataMap.java | 57 +++
.../core/datamap/dev/DataMapFactory.java | 73 ++++
.../core/datamap/dev/DataMapWriter.java | 58 +++
.../core/datastore/page/EncodedTablePage.java | 11 -
.../indexstore/BlockletDataMapIndexStore.java | 13 +-
.../carbondata/core/indexstore/DataMap.java | 60 ---
.../core/indexstore/DataMapDistributable.java | 56 ---
.../core/indexstore/DataMapFactory.java | 87 ----
.../core/indexstore/DataMapStoreManager.java | 139 -------
.../carbondata/core/indexstore/DataMapType.java | 36 --
.../core/indexstore/DataMapWriter.java | 50 ---
.../core/indexstore/TableDataMap.java | 133 ------
.../blockletindex/BlockletDataMap.java | 45 +-
.../blockletindex/BlockletDataMapFactory.java | 46 +-
.../core/metadata/AbsoluteTableIdentifier.java | 4 +
.../core/metadata/CarbonTableIdentifier.java | 6 +-
.../apache/carbondata/core/util/CarbonUtil.java | 6 +
.../hadoop/api/CarbonTableInputFormat.java | 16 +-
.../spark/load/CarbonLoaderUtilTest.java | 417 -------------------
.../validation/FileFooterValidator.java | 155 -------
.../testsuite/datamap/DataMapWriterSuite.scala | 180 ++++++++
.../spark/rdd/DataManagementFunc.scala | 5 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 2 +-
.../spark/sql/hive/CarbonHiveMetaStore.scala | 2 +-
.../iud/DeleteCarbonTableSubqueryTestCase.scala | 5 +-
.../core/datastore/GenericDataType.java | 145 -------
.../datamap/DataMapWriterListener.java | 138 ++++++
.../processing/datatypes/ArrayDataType.java | 1 -
.../processing/datatypes/GenericDataType.java | 145 +++++++
.../processing/datatypes/PrimitiveDataType.java | 1 -
.../processing/datatypes/StructDataType.java | 1 -
.../impl/ComplexFieldConverterImpl.java | 2 +-
.../converter/impl/FieldEncoderFactory.java | 2 +-
.../store/CarbonFactDataHandlerColumnar.java | 131 +++---
.../store/CarbonFactDataHandlerModel.java | 14 +-
.../carbondata/processing/store/TablePage.java | 66 ++-
.../store/writer/AbstractFactDataWriter.java | 115 +++--
.../store/writer/CarbonDataWriterVo.java | 11 +
.../store/writer/CarbonFactDataWriter.java | 5 +-
.../writer/v1/CarbonFactDataWriterImplV1.java | 11 +-
.../writer/v2/CarbonFactDataWriterImplV2.java | 13 +-
.../store/writer/v3/BlockletDataHolder.java | 72 ++++
.../writer/v3/CarbonFactDataWriterImplV3.java | 131 +++---
.../store/writer/v3/DataWriterHolder.java | 62 ---
.../util/CarbonDataProcessorUtil.java | 2 +-
49 files changed, 1502 insertions(+), 1612 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
new file mode 100644
index 0000000..517f629
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+
+/**
+ * Distributable class for datamap.
+ */
+public abstract class DataMapDistributable implements Distributable {
+
+ private String tablePath;
+
+ private String segmentId;
+
+ private String dataMapName;
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ public void setTablePath(String tablePath) {
+ this.tablePath = tablePath;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ public String getDataMapName() {
+ return dataMapName;
+ }
+
+ public void setDataMapName(String dataMapName) {
+ this.dataMapName = dataMapName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
new file mode 100644
index 0000000..7746acf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+
+public class DataMapMeta {
+
+ private List<String> indexedColumns;
+
+ private FilterType optimizedOperation;
+
+ public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) {
+ this.indexedColumns = indexedColumns;
+ this.optimizedOperation = optimizedOperation;
+ }
+
+ public List<String> getIndexedColumns() {
+ return indexedColumns;
+ }
+
+ public FilterType getOptimizedOperation() {
+ return optimizedOperation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
new file mode 100644
index 0000000..f5bc22f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * It maintains all the DataMaps in it.
+ */
+public final class DataMapStoreManager {
+
+ private static DataMapStoreManager instance = new DataMapStoreManager();
+
+ /**
+ * Contains the list of datamaps for each table.
+ */
+ private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
+
+ private DataMapStoreManager() {
+
+ }
+
+ public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
+ return allDataMaps.get(identifier.uniqueName());
+ }
+
+ /**
+ * Get the datamap for reading data.
+ *
+ * @param dataMapName
+ * @param factoryClass
+ * @return
+ */
+ public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+ Class<? extends DataMapFactory> factoryClass) {
+ String table = identifier.uniqueName();
+ List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+ TableDataMap dataMap;
+ if (tableDataMaps == null) {
+ dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName);
+ } else {
+ dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+ }
+ if (dataMap == null) {
+ throw new RuntimeException("Datamap does not exist");
+ }
+ return dataMap;
+ }
+
+ /**
+ * Return a new datamap instance and registered in the store manager.
+ * The datamap is created using datamap name, datamap factory class and table identifier.
+ */
+ public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
+ Class<? extends DataMapFactory> factoryClass, String dataMapName) {
+ String table = identifier.uniqueName();
+ List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+ if (tableDataMaps == null) {
+ tableDataMaps = new ArrayList<>();
+ allDataMaps.put(table, tableDataMaps);
+ }
+ TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+ if (dataMap != null) {
+ throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
+ }
+
+ try {
+ DataMapFactory dataMapFactory = factoryClass.newInstance();
+ dataMapFactory.init(identifier, dataMapName);
+ dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new RuntimeException(e);
+ }
+ tableDataMaps.add(dataMap);
+ return dataMap;
+ }
+
+ private TableDataMap getAbstractTableDataMap(String dataMapName,
+ List<TableDataMap> tableDataMaps) {
+ TableDataMap dataMap = null;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap.getDataMapName().equals(dataMapName)) {
+ dataMap = tableDataMap;
+ break;
+ }
+ }
+ return dataMap;
+ }
+
+ /**
+ * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+ * @param identifier
+ * @param dataMapName
+ */
+ public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+ List<TableDataMap> tableDataMaps = allDataMaps.get(identifier);
+ if (tableDataMaps != null) {
+ int i = 0;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+ tableDataMap.clear();
+ tableDataMaps.remove(i);
+ break;
+ }
+ i++;
+ }
+ }
+ }
+
+ /**
+ * Returns the singleton instance
+ * @return
+ */
+ public static DataMapStoreManager getInstance() {
+ return instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
new file mode 100644
index 0000000..b55c5d9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.events.EventListener;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * DataMap at the table level, user can add any number of datamaps for one table. Depends
+ * on the filter condition it can prune the blocklets.
+ */
+public final class TableDataMap implements EventListener {
+
+ private AbsoluteTableIdentifier identifier;
+
+ private String dataMapName;
+
+ private DataMapFactory dataMapFactory;
+
+ /**
+ * It is called to initialize and load the required table datamap metadata.
+ */
+ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+ DataMapFactory dataMapFactory) {
+ this.identifier = identifier;
+ this.dataMapName = dataMapName;
+ this.dataMapFactory = dataMapFactory;
+ }
+
+ /**
+ * Pass the valid segments and prune the datamap using filter expression
+ *
+ * @param segmentIds
+ * @param filterExp
+ * @return
+ */
+ public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp)
+ throws IOException {
+ List<Blocklet> blocklets = new ArrayList<>();
+ for (String segmentId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ for (DataMap dataMap : dataMaps) {
+ List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+ blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+ }
+ }
+ return blocklets;
+ }
+
+ private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+ for (Blocklet blocklet : pruneBlocklets) {
+ blocklet.setSegmentId(segmentId);
+ }
+ return pruneBlocklets;
+ }
+
+ /**
+ * This is used for making the datamap distributable.
+ * It takes the valid segments and returns all the datamaps as distributable objects so that
+ * it can be distributed across machines.
+ *
+ * @return
+ */
+ public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException {
+ List<DataMapDistributable> distributables = new ArrayList<>();
+ for (String segmentsId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+ for (DataMap dataMap : dataMaps) {
+ distributables.add(dataMap.toDistributable());
+ }
+ }
+ return distributables;
+ }
+
+ /**
+ * This method is used from any machine after it is distributed. It takes the distributable object
+ * to prune the filters.
+ *
+ * @param distributable
+ * @param filterExp
+ * @return
+ */
+ public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+ return dataMapFactory.getDataMap(distributable).prune(filterExp);
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+ dataMapFactory.fireEvent(event);
+ }
+
+ /**
+ * Clear only the datamaps of the segments
+ * @param segmentIds
+ */
+ public void clear(List<String> segmentIds) {
+ for (String segmentId: segmentIds) {
+ dataMapFactory.clear(segmentId);
+ }
+ }
+
+ /**
+ * Clears all datamap
+ */
+ public void clear() {
+ dataMapFactory.clear();
+ }
+ /**
+ * Get the unique name of datamap
+ *
+ * @return
+ */
+ public String getDataMapName() {
+ return dataMapName;
+ }
+
+ public DataMapFactory getDataMapFactory() {
+ return dataMapFactory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
new file mode 100644
index 0000000..526572a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * Datamap is an entity which can store and retrieve index data.
+ */
+public interface DataMap {
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ void init(String path) throws MemoryException, IOException;
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ *
+ * @param filterExp
+ * @return
+ */
+ List<Blocklet> prune(FilterResolverIntf filterExp);
+
+ /**
+ * Convert datamap to distributable object
+ * @return
+ */
+ DataMapDistributable toDistributable();
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ void clear();
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
new file mode 100644
index 0000000..873457c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+ /**
+ * Initialization of Datamap factory with the identifier and datamap name
+ */
+ void init(AbsoluteTableIdentifier identifier, String dataMapName);
+
+ /**
+ * Return a new write for this datamap
+ */
+ DataMapWriter createWriter(String segmentId);
+
+ /**
+ * Get the datamap for segmentid
+ */
+ List<DataMap> getDataMaps(String segmentId) throws IOException;
+
+ /**
+ * Get datamap for distributable object.
+ */
+ DataMap getDataMap(DataMapDistributable distributable);
+
+ /**
+ *
+ * @param event
+ */
+ void fireEvent(ChangeEvent event);
+
+ /**
+ * Clears datamap of the segment
+ */
+ void clear(String segmentId);
+
+ /**
+ * Clear all datamaps from memory
+ */
+ void clear();
+
+ /**
+ * Return metadata of this datamap
+ */
+ DataMapMeta getMeta();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
new file mode 100644
index 0000000..28163d7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ * Data Map writer
+ */
+public interface DataMapWriter {
+
+ /**
+ * Start of new block notification.
+ * @param blockId file name of the carbondata file
+ */
+ void onBlockStart(String blockId);
+
+ /**
+ * End of block notification
+ */
+ void onBlockEnd(String blockId);
+
+ /**
+ * Start of new blocklet notification.
+ * @param blockletId sequence number of blocklet in the block
+ */
+ void onBlockletStart(int blockletId);
+
+ /**
+ * End of blocklet notification
+ * @param blockletId sequence number of blocklet in the block
+ */
+ void onBlockletEnd(int blockletId);
+
+ /**
+ * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * DataMapMeta returned in DataMapFactory.
+ *
+ * Implementation should copy the content of `pages` as needed, because `pages` memory
+ * may be freed after this method returns, if using unsafe column page.
+ */
+ void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
index ea9c373..0aac1d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
@@ -42,9 +42,6 @@ public class EncodedTablePage {
// number of row in this page
private int pageSize;
- // true if it is last page of all input rows
- private boolean isLastPage;
-
// size in bytes of all encoded columns (including data and metadate)
private int encodedSize;
@@ -128,14 +125,6 @@ public class EncodedTablePage {
return pageKey;
}
- public boolean isLastPage() {
- return isLastPage;
- }
-
- public void setIsLastPage(boolean isWriteAll) {
- this.isLastPage = isWriteAll;
- }
-
public EncodedMeasurePage getMeasure(int measureIndex) {
return measures[measureIndex];
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index fc8c273..9d4af7b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -26,8 +26,8 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
/**
* Class to handle loading, unloading,clearing,storing of the table
@@ -73,10 +73,9 @@ public class BlockletDataMapIndexStore
if (dataMap == null) {
try {
dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
- } catch (IndexBuilderException e) {
- throw new IOException(e.getMessage(), e);
- } catch (Throwable e) {
- throw new IOException("Problem in loading segment block.", e);
+ } catch (MemoryException e) {
+ LOGGER.error("memory exception when loading datamap: " + e.getMessage());
+ throw new RuntimeException(e.getMessage(), e);
}
}
return dataMap;
@@ -93,6 +92,7 @@ public class BlockletDataMapIndexStore
for (BlockletDataMap dataMap : blockletDataMaps) {
dataMap.clear();
}
+ e.printStackTrace();
throw new IOException("Problem in loading segment blocks.", e);
}
return blockletDataMaps;
@@ -130,7 +130,8 @@ public class BlockletDataMapIndexStore
* @throws IOException
*/
private BlockletDataMap loadAndGetDataMap(
- TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+ throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
deleted file mode 100644
index 1276494..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
-/**
- * Datamap is an entity which can store and retrieve index data.
- */
-public interface DataMap {
-
- /**
- * Give the writer to write the data.
- *
- * @return
- */
- DataMapWriter getWriter();
-
- /**
- * It is called to load the data map to memory or to initialize it.
- */
- void init(String path);
-
- /**
- * Prune the datamap with filter expression. It returns the list of
- * blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
- */
- List<Blocklet> prune(FilterResolverIntf filterExp);
-
- /**
- * Convert datamap to distributable object
- * @return
- */
- DataMapDistributable toDistributable();
-
- /**
- * Clear complete index table and release memory.
- */
- void clear();
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
deleted file mode 100644
index 4c379f3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-
-/**
- * Distributable class for datamap.
- */
-public abstract class DataMapDistributable implements Distributable {
-
- private String tablePath;
-
- private String segmentId;
-
- private String dataMapName;
-
- public String getTablePath() {
- return tablePath;
- }
-
- public void setTablePath(String tablePath) {
- this.tablePath = tablePath;
- }
-
- public String getSegmentId() {
- return segmentId;
- }
-
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- public String getDataMapName() {
- return dataMapName;
- }
-
- public void setDataMapName(String dataMapName) {
- this.dataMapName = dataMapName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
deleted file mode 100644
index 72f714f..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * Interface for datamap factory, it is responsible for creating the datamap.
- */
-public interface DataMapFactory {
-
- /**
- * Initialization of Datamap factory
- * @param identifier
- * @param dataMapName
- */
- void init(AbsoluteTableIdentifier identifier, String dataMapName);
- /**
- * Get the datamap writer for each segmentid.
- *
- * @param identifier
- * @param segmentId
- * @return
- */
- DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
- String segmentId);
-
- /**
- * Get the datamap for segmentid
- *
- * @param segmentId
- * @return
- */
- List<DataMap> getDataMaps(String segmentId);
-
- /**
- * Get datamap for distributable object.
- *
- * @param distributable
- * @return
- */
- DataMap getDataMap(DataMapDistributable distributable);
-
- /**
- * This method checks whether the columns and the type of filters supported
- * for this datamap or not
- *
- * @param filterType
- * @return
- */
- boolean isFiltersSupported(FilterType filterType);
-
- /**
- *
- * @param event
- */
- void fireEvent(ChangeEvent event);
-
- /**
- * Clears datamap of the segment
- */
- void clear(String segmentId);
-
- /**
- * Clear all datamaps from memory
- */
- void clear();
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
deleted file mode 100644
index 1664a6a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * It maintains all the DataMaps in it.
- */
-public final class DataMapStoreManager {
-
- private static DataMapStoreManager instance = new DataMapStoreManager();
-
- /**
- * Contains the list of datamaps for each table.
- */
- private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
-
- private DataMapStoreManager() {
-
- }
-
- /**
- * Get the datamap for reading data.
- *
- * @param dataMapName
- * @param mapType
- * @return
- */
- public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
- DataMapType mapType) {
- List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
- TableDataMap dataMap;
- if (tableDataMaps == null) {
- createTableDataMap(identifier, mapType, dataMapName);
- tableDataMaps = dataMapMappping.get(identifier);
- }
- dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
- if (dataMap == null) {
- throw new RuntimeException("Datamap does not exist");
- }
- return dataMap;
- }
-
- /**
- * Create new datamap instance using datamap name, datamap type and table identifier
- *
- * @param mapType
- * @return
- */
- private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
- DataMapType mapType, String dataMapName) {
- List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
- if (tableDataMaps == null) {
- tableDataMaps = new ArrayList<>();
- dataMapMappping.put(identifier, tableDataMaps);
- }
- TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
- if (dataMap != null) {
- throw new RuntimeException("Already datamap exists in that path with type " + mapType);
- }
-
- try {
- DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
- dataMapFactory.init(identifier, dataMapName);
- dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
- } catch (Exception e) {
- LOGGER.error(e);
- throw new RuntimeException(e);
- }
- tableDataMaps.add(dataMap);
- return dataMap;
- }
-
- private TableDataMap getAbstractTableDataMap(String dataMapName,
- List<TableDataMap> tableDataMaps) {
- TableDataMap dataMap = null;
- for (TableDataMap tableDataMap: tableDataMaps) {
- if (tableDataMap.getDataMapName().equals(dataMapName)) {
- dataMap = tableDataMap;
- break;
- }
- }
- return dataMap;
- }
-
- /**
- * Clear the datamap/datamaps of a mentioned datamap name and table from memory
- * @param identifier
- * @param dataMapName
- */
- public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
- List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
- if (tableDataMaps != null) {
- int i = 0;
- for (TableDataMap tableDataMap: tableDataMaps) {
- if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
- tableDataMap.clear();
- tableDataMaps.remove(i);
- break;
- }
- i++;
- }
- }
- }
-
- /**
- * Returns the singleton instance
- * @return
- */
- public static DataMapStoreManager getInstance() {
- return instance;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
deleted file mode 100644
index 0059b29..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
-
-/**
- * Datamap type
- */
-public enum DataMapType {
- BLOCKLET(BlockletDataMapFactory.class);
-
- private Class<? extends DataMapFactory> classObject;
-
- DataMapType(Class<? extends DataMapFactory> classObject) {
- this.classObject = classObject;
- }
-
- public Class<? extends DataMapFactory> getClassObject() {
- return classObject;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
deleted file mode 100644
index bd8be09..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.io.DataOutput;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter<T> {
-
- /**
- * Initialize the data map writer with output stream
- *
- * @param outStream
- */
- void init(DataOutput outStream);
-
- /**
- * Add the index row to the in-memory store.
- */
- void writeData(T data);
-
- /**
- * Get the added row count
- *
- * @return
- */
- int getRowCount();
-
- /**
- * Finish writing of data map table, otherwise it will not be allowed to read.
- */
- void finish();
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
deleted file mode 100644
index 39ca4c5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.events.EventListener;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-/**
- * DataMap at the table level, user can add any number of datamaps for one table. Depends
- * on the filter condition it can prune the blocklets.
- */
-public final class TableDataMap implements EventListener {
-
- private AbsoluteTableIdentifier identifier;
-
- private String dataMapName;
-
- private DataMapFactory dataMapFactory;
-
- /**
- * It is called to initialize and load the required table datamap metadata.
- */
- public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
- DataMapFactory dataMapFactory) {
- this.identifier = identifier;
- this.dataMapName = dataMapName;
- this.dataMapFactory = dataMapFactory;
- }
-
- /**
- * Pass the valid segments and prune the datamap using filter expression
- *
- * @param segmentIds
- * @param filterExp
- * @return
- */
- public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
- List<Blocklet> blocklets = new ArrayList<>();
- for (String segmentId : segmentIds) {
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
- for (DataMap dataMap : dataMaps) {
- List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
- blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
- }
- }
- return blocklets;
- }
-
- private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
- for (Blocklet blocklet : pruneBlocklets) {
- blocklet.setSegmentId(segmentId);
- }
- return pruneBlocklets;
- }
-
- /**
- * This is used for making the datamap distributable.
- * It takes the valid segments and returns all the datamaps as distributable objects so that
- * it can be distributed across machines.
- *
- * @return
- */
- public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
- List<DataMapDistributable> distributables = new ArrayList<>();
- for (String segmentsId : segmentIds) {
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
- for (DataMap dataMap : dataMaps) {
- distributables.add(dataMap.toDistributable());
- }
- }
- return distributables;
- }
-
- /**
- * This method is used from any machine after it is distributed. It takes the distributable object
- * to prune the filters.
- *
- * @param distributable
- * @param filterExp
- * @return
- */
- public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
- return dataMapFactory.getDataMap(distributable).prune(filterExp);
- }
-
- @Override public void fireEvent(ChangeEvent event) {
- dataMapFactory.fireEvent(event);
- }
-
- /**
- * Clear only the datamaps of the segments
- * @param segmentIds
- */
- public void clear(List<String> segmentIds) {
- for (String segmentId: segmentIds) {
- dataMapFactory.clear(segmentId);
- }
- }
-
- /**
- * Clears all datamap
- */
- public void clear() {
- dataMapFactory.clear();
- }
- /**
- * Get the unique name of datamap
- *
- * @return
- */
- public String getDataMapName() {
- return dataMapName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 4b5be11..2e82c46 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -31,14 +31,13 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -64,6 +63,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+ public static final String NAME = "clustered.btree.blocklet";
+
private static int KEY_INDEX = 0;
private static int MIN_VALUES_INDEX = 1;
@@ -88,31 +89,23 @@ public class BlockletDataMap implements DataMap, Cacheable {
private int[] columnCardinality;
- @Override public DataMapWriter getWriter() {
- return null;
- }
-
- @Override public void init(String path) {
+ @Override public void init(String path) throws IOException, MemoryException {
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
- try {
- List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
- for (DataFileFooter fileFooter : indexInfo) {
- List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
- if (segmentProperties == null) {
- columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
- segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
- createSchema(segmentProperties);
- }
- TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
- fileFooter = CarbonUtil.readMetadatFile(blockInfo);
-
- loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
- }
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.finishWriting();
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+ for (DataFileFooter fileFooter : indexInfo) {
+ List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+ if (segmentProperties == null) {
+ columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+ segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+ createSchema(segmentProperties);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+ fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+ loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+ }
+ if (unsafeMemoryDMStore != null) {
+ unsafeMemoryDMStore.finishWriting();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2fe6643..e189931 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -25,16 +25,16 @@ import java.util.Map;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapFactory;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
/**
@@ -44,21 +44,25 @@ public class BlockletDataMapFactory implements DataMapFactory {
private AbsoluteTableIdentifier identifier;
+ // segmentId -> list of index file
private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+ @Override
public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
this.identifier = identifier;
cache = CacheProvider.getInstance()
.createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
}
- public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
- return null;
+ @Override
+ public DataMapWriter createWriter(String segmentId) {
+ throw new UnsupportedOperationException("not implemented");
}
- public List<DataMap> getDataMaps(String segmentId) {
+ @Override
+ public List<DataMap> getDataMaps(String segmentId) throws IOException {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segmentId);
if (tableBlockIndexUniqueIdentifiers == null) {
@@ -77,17 +81,10 @@ public class BlockletDataMapFactory implements DataMapFactory {
}
}
- try {
- return cache.getAll(tableBlockIndexUniqueIdentifiers);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override public boolean isFiltersSupported(FilterType filterType) {
- return true;
+ return cache.getAll(tableBlockIndexUniqueIdentifiers);
}
+ @Override
public void clear(String segmentId) {
List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
if (blockIndexes != null) {
@@ -99,17 +96,26 @@ public class BlockletDataMapFactory implements DataMapFactory {
}
}
- @Override public void clear() {
+ @Override
+ public void clear() {
for (String segmentId: segmentMap.keySet()) {
clear(segmentId);
}
}
- @Override public DataMap getDataMap(DataMapDistributable distributable) {
+ @Override
+ public DataMap getDataMap(DataMapDistributable distributable) {
return null;
}
- @Override public void fireEvent(ChangeEvent event) {
+ @Override
+ public void fireEvent(ChangeEvent event) {
}
+
+ @Override
+ public DataMapMeta getMeta() {
+ // TODO: pass SORT_COLUMNS into this class
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 22faaf2..31ad03b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -156,4 +156,8 @@ public class AbsoluteTableIdentifier implements Serializable {
}
return true;
}
+
+ public String uniqueName() {
+ return storePath + "/" + carbonTableIdentifier.toString().toLowerCase();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
index 31a0b23..cc65d9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
@@ -128,9 +128,9 @@ public class CarbonTableIdentifier implements Serializable {
return true;
}
- /*
- * @return table unidque name
- */
+ /**
+ * return unique table name
+ */
@Override public String toString() {
return databaseName + '_' + tableName;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index edc4c28..15512a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1950,5 +1950,11 @@ public final class CarbonUtil {
throw new IllegalArgumentException("Invalid data type: " + meta.getType());
}
}
+
+ public static void requireNotNull(Object obj) {
+ if (obj == null) {
+ throw new IllegalArgumentException("parameter not be null");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 54ad18b..19e264b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -30,11 +30,12 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.DataMapStoreManager;
-import org.apache.carbondata.core.indexstore.DataMapType;
-import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -246,7 +247,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
@Override public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
TableDataMap blockletMap =
- DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
+ BlockletDataMapFactory.class);
List<String> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
@@ -403,7 +405,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
TableDataMap blockletMap = DataMapStoreManager.getInstance()
- .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+ .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
@@ -549,8 +551,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
*/
public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
throws IOException, KeyGenException {
- TableDataMap blockletMap =
- DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ TableDataMap blockletMap = DataMapStoreManager.getInstance()
+ .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
new SegmentStatusManager(identifier).getValidAndInvalidSegments();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
deleted file mode 100644
index 76c7e6f..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.load;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.spark.load.CarbonLoaderUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test class to test block distribution functionality
- */
-public class CarbonLoaderUtilTest {
- List<Distributable> blockInfos = null;
- int noOfNodesInput = -1;
- List<String> activeNode = null;
- Map<String, List<Distributable>> expected = null;
- Map<String, List<Distributable>> mapOfNodes = null;
-
- @Test public void nodeBlockMapping() throws Exception {
-
- // scenario when the 3 nodes and 3 executors
- initSet1();
- Map<String, List<Distributable>> mapOfNodes =
- CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
- // node allocation
- Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
- // block allocation
- boolean isEqual = compareResult(expected, mapOfNodes);
- Assert.assertTrue("Block Allocation", isEqual);
-
- // 2 node and 3 executors
- initSet2();
- mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
- // node allocation
- Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
- // block allocation
- isEqual = compareResult(expected, mapOfNodes);
- Assert.assertTrue("Block Allocation", isEqual);
-
- // 3 data node and 2 executors
- initSet3();
- mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
- // node allocation
- Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
- // block allocation
- isEqual = compareResult(expected, mapOfNodes);
- Assert.assertTrue("Block Allocation", isEqual);
- }
-
- /**
- * compares the blocks allocation
- *
- * @param expectedResult
- * @param actualResult
- * @return
- */
- private boolean compareResult(Map<String, List<Distributable>> expectedResult,
- Map<String, List<Distributable>> actualResult) {
- expectedResult = sortByListSize(expectedResult);
- actualResult = sortByListSize(actualResult);
- List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet());
- List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet());
- boolean isEqual = expectedList.size() == mapOfNodesList.size();
- if (isEqual) {
- for (int i = 0; i < expectedList.size(); i++) {
- int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size();
- int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size();
- isEqual = size1 == size2;
- if (!isEqual) {
- break;
- }
- }
- }
- return isEqual;
- }
-
- /**
- * sort by list size
- *
- * @param map
- * @return
- */
- private static Map<String, List<Distributable>> sortByListSize(
- Map<String, List<Distributable>> map) {
- List<List<Distributable>> list = new LinkedList(map.entrySet());
- Collections.sort(list, new Comparator() {
- public int compare(Object obj1, Object obj2) {
- if (obj1 == null && obj2 == null) {
- return 0;
- } else if (obj1 == null) {
- return 1;
- } else if (obj2 == null) {
- return -1;
- }
- int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size();
- int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size();
- return size2 - size1;
- }
- });
-
- Map res = new LinkedHashMap();
- for (Iterator it = list.iterator(); it.hasNext(); ) {
- Map.Entry entry = (Map.Entry) it.next();
- res.put(entry.getKey(), entry.getValue());
- }
- return res;
- }
-
- void initSet1() {
- blockInfos = new ArrayList<>();
- activeNode = new ArrayList<>();
- activeNode.add("node-7");
- activeNode.add("node-9");
- activeNode.add("node-11");
- String[] location = { "node-7", "node-9", "node-11" };
- blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
- expected = new HashMap<>();
- expected.put("node-7", blockInfos.subList(0, 2));
- expected.put("node-9", blockInfos.subList(2, 4));
- expected.put("node-11", blockInfos.subList(4, 6));
- }
-
- void initSet2() {
- blockInfos = new ArrayList<>();
- activeNode = new ArrayList<>();
- activeNode.add("node-7");
- activeNode.add("node-9");
- activeNode.add("node-11");
- String[] location = { "node-7", "node-11" };
- blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
- expected = new HashMap<>();
- expected.put("node-7", blockInfos.subList(0, 2));
- expected.put("node-9", blockInfos.subList(2, 4));
- expected.put("node-11", blockInfos.subList(4, 6));
- }
-
- void initSet3() {
- blockInfos = new ArrayList<>();
- activeNode = new ArrayList<>();
- activeNode.add("node-7");
- activeNode.add("node-11");
- String[] location = { "node-7", "node-9", "node-11" };
- blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
- blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
- expected = new HashMap<>();
- expected.put("node-7", blockInfos.subList(0, 3));
- expected.put("node-11", blockInfos.subList(3, 6));
- }
-
-
- /**
- * Test case with 4 blocks and 4 nodes with 3 replication.
- *
- * @throws Exception
- */
- @Test public void nodeBlockMapping() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("path1", 123, "1", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("path2", 123, "2", new String[] { "2", "3", "4" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("path3", 123, "3", new String[] { "3", "4", "1" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("path4", 123, "4", new String[] { "1", "2", "4" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block2, Arrays.asList(new String[]{"2","3","4"}));
- inputMap.put(block3, Arrays.asList(new String[]{"3","4","1"}));
- inputMap.put(block4, Arrays.asList(new String[]{"1","2","4"}));
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
-
- Map<String, List<TableBlockInfo>> outputMap
- = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 4, 4));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 4, 4));
- }
-
- private boolean calculateBlockLocality(Map<TableBlockInfo, List<String>> inputMap,
- Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
- double notInNodeLocality = 0;
- for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
- List<TableBlockInfo> blockListOfANode = entry.getValue();
-
- for (TableBlockInfo eachBlock : blockListOfANode) {
-
- // for each block check the node locality
-
- List<String> blockLocality = inputMap.get(eachBlock);
- if (!blockLocality.contains(entry.getKey())) {
- notInNodeLocality++;
- }
- }
- }
-
- System.out.println(
- ((notInNodeLocality / numberOfBlocks) * 100) + " " + "is the node locality mismatch");
- if ((notInNodeLocality / numberOfBlocks) * 100 > 30) {
- return false;
- }
- return true;
- }
-
- private boolean calculateBlockDistribution(Map<TableBlockInfo, List<String>> inputMap,
- Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
- int nodesPerBlock = numberOfBlocks / numberOfNodes;
-
- for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
- if (entry.getValue().size() < nodesPerBlock) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Test case with 5 blocks and 3 nodes
- *
- * @throws Exception
- */
- @Test public void nodeBlockMappingTestWith5blocks3nodes() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block5 =
- new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
- inputBlocks.add(block5);
-
- Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 3);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 5, 3));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 5, 3));
-
- }
-
- /**
- * Test case with 6 blocks and 4 nodes where 4 th node doesnt have any local data.
- *
- * @throws Exception
- */
- @Test public void nodeBlockMappingTestWith6Blocks4nodes() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block5 =
- new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
- TableBlockInfo block6 =
- new TableBlockInfo("part-5-0-1462341987000", 123, "6", new String[] { "1", "2", "3" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
- inputMap.put(block6, Arrays.asList(new String[]{"1","2","3"}));
-
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
- inputBlocks.add(block5);
- inputBlocks.add(block6);
-
- Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 6, 4));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 6, 4));
-
- }
-
- /**
- * Test case with 10 blocks and 4 nodes with 10,60,30 % distribution
- *
- * @throws Exception
- */
- @Test public void nodeBlockMappingTestWith10Blocks4nodes() throws Exception {
-
- Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
- TableBlockInfo block1 =
- new TableBlockInfo("part-1-0-1462341987000", 123, "1", new String[] { "2", "4" }, 111);
- TableBlockInfo block2 =
- new TableBlockInfo("part-2-0-1462341987000", 123, "2", new String[] { "2", "4" }, 111);
- TableBlockInfo block3 =
- new TableBlockInfo("part-3-0-1462341987000", 123, "3", new String[] { "2", "4" }, 111);
- TableBlockInfo block4 =
- new TableBlockInfo("part-4-0-1462341987000", 123, "4", new String[] { "2", "4" }, 111);
- TableBlockInfo block5 =
- new TableBlockInfo("part-5-0-1462341987000", 123, "5", new String[] { "2", "4" }, 111);
- TableBlockInfo block6 =
- new TableBlockInfo("part-6-0-1462341987000", 123, "6", new String[] { "2", "4" }, 111);
- TableBlockInfo block7 =
- new TableBlockInfo("part-7-0-1462341987000", 123, "7", new String[] { "3", "4" }, 111);
- TableBlockInfo block8 =
- new TableBlockInfo("part-8-0-1462341987000", 123, "8", new String[] { "3", "4" }, 111);
- TableBlockInfo block9 =
- new TableBlockInfo("part-9-0-1462341987000", 123, "9", new String[] { "3", "4" }, 111);
- TableBlockInfo block10 =
- new TableBlockInfo("part-10-0-1462341987000", 123, "9", new String[] { "1", "4" }, 111);
-
- inputMap.put(block1, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block2, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block3, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block4, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block5, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block6, Arrays.asList(new String[]{"2","4"}));
- inputMap.put(block7, Arrays.asList(new String[]{"3","4"}));
- inputMap.put(block8, Arrays.asList(new String[]{"3","4"}));
- inputMap.put(block9, Arrays.asList(new String[]{"3","4"}));
- inputMap.put(block10, Arrays.asList(new String[]{"1","4"}));
-
- List<TableBlockInfo> inputBlocks = new ArrayList(6);
- inputBlocks.add(block1);
- inputBlocks.add(block2);
- inputBlocks.add(block3);
- inputBlocks.add(block4);
- inputBlocks.add(block5);
- inputBlocks.add(block6);
- inputBlocks.add(block7);
- inputBlocks.add(block8);
- inputBlocks.add(block9);
- inputBlocks.add(block10);
-
- Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
- Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 10, 4));
-
- Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 10, 4));
- }
-
-}
\ No newline at end of file