You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/13 20:01:19 UTC
[2/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter
interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
deleted file mode 100644
index 5837f0c..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.testsuite.validation;
-
-import org.apache.spark.sql.common.util.CarbonHiveContext;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.reader.CarbonFooterReader;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.format.BlockletIndex;
-import org.apache.carbondata.format.BlockletInfo;
-import org.apache.carbondata.format.DataChunk;
-import org.apache.carbondata.format.Encoding;
-import org.apache.carbondata.format.FileFooter;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class FileFooterValidator {
-
- private static FileFooter fileFooter;
-
- private static boolean setUpIsDone;
-
- @Before public void setUp() throws Exception {
-
- if (setUpIsDone) {
- return;
- }
- CarbonHiveContext.sql(
- "CREATE CUBE validatefooter DIMENSIONS (empno Integer, empname String,"
- + " designation String,"
- + " doj Timestamp, workgroupcategory Integer, workgroupcategoryname String, "
- + "deptno Integer, deptname String, projectcode Integer, projectjoindate Timestamp,"
- + " projectenddate Timestamp) MEASURES (attendance Integer,utilization Integer,"
- + "salary Integer) OPTIONS (PARTITIONER [PARTITION_COUNT=1])");
- CarbonHiveContext.sql(
- "LOAD DATA fact from './src/test/resources/data.csv' INTO CUBE validatefooter "
- + "PARTITIONDATA(DELIMITER ',', QUOTECHAR '\"')");
- String storePath =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
- CarbonTableIdentifier tableIdentifier =
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "validatefooter", "1");
- String segmentPath = CarbonStorePath.getCarbonTablePath(storePath, tableIdentifier)
- .getCarbonDataDirectoryPath("0", "0");
- CarbonFile carbonFile =
- FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
- CarbonFile[] list = carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
- return true;
- }
- return false;
- }
- });
-
- for (CarbonFile file : list) {
- String fileLocation = file.getAbsolutePath();
- CarbonFile factFile =
- FileFactory.getCarbonFile(fileLocation, FileFactory.getFileType(fileLocation));
- long offset = factFile.getSize() - CarbonCommonConstants.LONG_SIZE_IN_BYTE;
- FileHolder fileHolder = FileFactory.getFileHolder(FileFactory.getFileType(fileLocation));
- offset = fileHolder.readLong(fileLocation, offset);
- CarbonFooterReader metaDataReader = new CarbonFooterReader(fileLocation, offset);
- fileFooter = metaDataReader.readFooter();
- }
- setUpIsDone = true;
- }
-
- @AfterClass public static void tearDownAfterClass() {
- CarbonHiveContext.sql("drop CUBE validatefooter");
- }
-
- @Test public void testFileFooterExist() {
- assertTrue(fileFooter != null);
- }
-
- @Test public void testFileFooterVersion() {
- assertTrue(fileFooter.getVersion() >= 0);
- }
-
- @Test public void testFileFooterNumRows() {
- assertTrue(fileFooter.getNum_rows() > 0);
- }
-
- @Test public void testFileFooterTableColumns() {
- assertTrue(fileFooter.getTable_columns() != null && fileFooter.getTable_columns().size() > 0);
- }
-
- @Test public void testFileFooterSegmentInfo() {
- assertTrue(
- fileFooter.getSegment_info() != null && fileFooter.getSegment_info().getNum_cols() > 0
- && fileFooter.getSegment_info().getColumn_cardinalities().size() > 0);
- }
-
- @Test public void testFileFooterBlockletIndex() {
- assertTrue(fileFooter.getBlocklet_index_list() != null
- && fileFooter.getBlocklet_index_list().size() > 0);
- for (BlockletIndex blockletIndex : fileFooter.getBlocklet_index_list()) {
- assertTrue(blockletIndex.getMin_max_index().getMin_values() != null
- && blockletIndex.getMin_max_index().getMin_values().size() > 0
- && blockletIndex.getMin_max_index().getMax_values() != null
- && blockletIndex.getMin_max_index().getMax_values().size() > 0
- && blockletIndex.getMin_max_index().getMin_values().size() == blockletIndex
- .getMin_max_index().getMax_values().size());
- assertTrue(blockletIndex.getB_tree_index().getStart_key() != null
- && blockletIndex.getB_tree_index().getEnd_key() != null);
- }
- }
-
- @Test public void testFileFooterBlockletInfo() {
- assertTrue(fileFooter.getBlocklet_info_list() != null
- && fileFooter.getBlocklet_info_list().size() > 0);
- for (BlockletInfo blockletInfo : fileFooter.getBlocklet_info_list()) {
- assertTrue(blockletInfo.getNum_rows() > 0 && blockletInfo.getColumn_data_chunks() != null
- && blockletInfo.getColumn_data_chunks().size() > 0);
- for (DataChunk columnDataChunk : blockletInfo.getColumn_data_chunks()) {
- testColumnDataChunk(columnDataChunk);
- }
- }
- }
-
- private void testColumnDataChunk(DataChunk columnDatachunk) {
- assertTrue(columnDatachunk.getEncoders() != null && columnDatachunk.getChunk_meta() != null
- && columnDatachunk.getChunk_meta().getCompression_codec() != null);
- // For Measure
- if (columnDatachunk.getEncoders().contains(Encoding.DELTA)) {
- assertTrue(
- columnDatachunk.getPresence() != null && columnDatachunk.getEncoder_meta() != null);
- } else {
- assertTrue(columnDatachunk.getSort_state() != null);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
new file mode 100644
index 0000000..b0e4833
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.events.ChangeEvent
+import org.apache.carbondata.core.indexstore.schema.FilterType
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.util.CarbonProperties
+
+class C2DataMapFactory() extends DataMapFactory {
+
+ override def init(identifier: AbsoluteTableIdentifier,
+ dataMapName: String): Unit = {}
+
+ override def fireEvent(event: ChangeEvent[_]): Unit = ???
+
+ override def clear(segmentId: String): Unit = ???
+
+ override def clear(): Unit = ???
+
+ override def getDataMap(distributable: DataMapDistributable): DataMap = ???
+
+ override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+
+ override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+
+ override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
+}
+
+class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+
+ def buildTestData(numRows: Int): DataFrame = {
+ import sqlContext.implicits._
+ sqlContext.sparkContext.parallelize(1 to numRows)
+ .map(x => ("a", "b", x))
+ .toDF("c1", "c2", "c3")
+ }
+
+ def dropTable(): Unit = {
+ sql("DROP TABLE IF EXISTS carbon1")
+ sql("DROP TABLE IF EXISTS carbon2")
+ }
+
+ override def beforeAll {
+ dropTable()
+ }
+
+ test("test write datamap 2 pages") {
+ // register datamap writer
+ DataMapStoreManager.getInstance().createAndRegisterDataMap(
+ AbsoluteTableIdentifier.from(storeLocation, "default", "carbon1"),
+ classOf[C2DataMapFactory],
+ "test")
+
+ val df = buildTestData(33000)
+
+ // save dataframe to carbon file
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon1")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+ assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+ assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+ "blocklet start 0",
+ "add page data: blocklet 0, page 0",
+ "add page data: blocklet 0, page 1",
+ "blocklet end: 0"
+ ))
+ DataMapWriterSuite.callbackSeq = Seq()
+ }
+
+ test("test write datamap 2 blocklet") {
+ // register datamap writer
+ DataMapStoreManager.getInstance().createAndRegisterDataMap(
+ AbsoluteTableIdentifier.from(storeLocation, "default", "carbon2"),
+ classOf[C2DataMapFactory],
+ "test")
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.blockletgroup.size.in.mb", "1")
+
+ val df = buildTestData(300000)
+
+ // save dataframe to carbon file
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon2")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+ assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+ assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+ "blocklet start 0",
+ "add page data: blocklet 0, page 0",
+ "add page data: blocklet 0, page 1",
+ "add page data: blocklet 0, page 2",
+ "add page data: blocklet 0, page 3",
+ "add page data: blocklet 0, page 4",
+ "add page data: blocklet 0, page 5",
+ "add page data: blocklet 0, page 6",
+ "add page data: blocklet 0, page 7",
+ "blocklet end: 0",
+ "blocklet start 1",
+ "add page data: blocklet 1, page 0",
+ "add page data: blocklet 1, page 1",
+ "blocklet end: 1"
+ ))
+ DataMapWriterSuite.callbackSeq = Seq()
+ }
+
+ override def afterAll {
+ dropTable()
+ }
+}
+
+object DataMapWriterSuite {
+ var callbackSeq: Seq[String] = Seq[String]()
+
+ val dataMapWriterC2Mock = new DataMapWriter {
+
+ override def onPageAdded(
+ blockletId: Int,
+ pageId: Int,
+ pages: Array[ColumnPage]): Unit = {
+ assert(pages.length == 1)
+ assert(pages(0).getDataType == DataType.BYTE_ARRAY)
+ val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+ assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+ callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+ }
+
+ override def onBlockletEnd(blockletId: Int): Unit = {
+ callbackSeq :+= s"blocklet end: $blockletId"
+ }
+
+ override def onBlockEnd(blockId: String): Unit = {
+ callbackSeq :+= s"block end $blockId"
+ }
+
+ override def onBlockletStart(blockletId: Int): Unit = {
+ callbackSeq :+= s"blocklet start $blockletId"
+ }
+
+ override def onBlockStart(blockId: String): Unit = {
+ callbackSeq :+= s"block start $blockId"
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index e0829ed..a6a8835 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -256,10 +256,7 @@ object DataManagementFunc {
compactionModel.compactionType
)
- val future: Future[Void] = executor
- .submit(new CompactionCallable(compactionCallableModel
- )
- )
+ val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel))
futureList.add(future)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index a32146a..90f57a9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -31,9 +31,9 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 4620db0..1837c04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
index e92d06d..697b727 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
@@ -43,6 +43,7 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
sql("""select c1 from iud_db_sub.dest"""),
Seq(Row("c"), Row("d"), Row("e"))
)
+ sql("drop table if exists iud_db_sub.dest")
}
test("delete data from carbon table[where IN (sub query with where clause) ]") {
@@ -54,10 +55,12 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
sql("""select c1 from iud_db_sub.dest"""),
Seq(Row("a"), Row("c"), Row("d"), Row("e"))
)
+ sql("drop table if exists iud_db_sub.dest")
}
override def afterAll {
- sql("use default")
+ sql("drop table if exists iud_db_sub.source2")
sql("drop database if exists iud_db_sub cascade")
+ sql("use default")
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
deleted file mode 100644
index 12fe27b..0000000
--- a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-
-/**
- * Generic DataType interface which will be used while data loading for complex types like Array &
- * Struct
- */
-public interface GenericDataType<T> {
-
- /**
- * @return name of the column
- */
- String getName();
-
- /**
- * @return - columns parent name
- */
- String getParentname();
-
- /**
- * @param children - To add children dimension for parent complex type
- */
- void addChildren(GenericDataType children);
-
- /**
- * @param primitiveChild - Returns all primitive type columns in complex type
- */
- void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
-
- /**
- * writes to byte stream
- * @param dataOutputStream
- * @throws IOException
- */
- void writeByteArray(T input, DataOutputStream dataOutputStream)
- throws IOException, DictionaryGenerationException;
-
- /**
- * @return surrogateIndex for primitive column in complex type
- */
- int getSurrogateIndex();
-
- /**
- * @param surrIndex - surrogate index of primitive column in complex type
- */
- void setSurrogateIndex(int surrIndex);
-
- /**
- * converts integer surrogate to bit packed surrogate value
- * @param byteArrayInput
- * @param dataOutputStream
- * @param generator
- * @throws IOException
- * @throws KeyGenException
- */
- void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
- KeyGenerator[] generator) throws IOException, KeyGenException;
-
- /**
- * @return columns count of each complex type
- */
- int getColsCount();
-
- /**
- * @return column uuid string
- */
- String getColumnId();
-
- /**
- * set array index to be referred while creating metadata column
- * @param outputArrayIndex
- */
- void setOutputArrayIndex(int outputArrayIndex);
-
- /**
- * @return array index count of metadata column
- */
- int getMaxOutputArrayIndex();
-
- /**
- * Split byte array into complex metadata column and primitive column
- * @param columnsArray
- * @param inputArray
- */
- void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
-
- /**
- * @return current read row count
- */
- int getDataCounter();
-
- /**
- * fill agg key block including complex types
- * @param aggKeyBlockWithComplex
- * @param aggKeyBlock
- */
- void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
-
- /**
- * fill block key size including complex types
- * @param blockKeySizeWithComplex
- * @param primitiveBlockKeySize
- */
- void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
-
- /**
- * fill cardinality value including complex types
- * @param dimCardWithComplex
- * @param maxSurrogateKeyArray
- */
- void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
-
- /**
- * Fill the cardinality of the primitive datatypes
- * @param dimCardWithComplex
- */
- void fillCardinality(List<Integer> dimCardWithComplex);
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
new file mode 100644
index 0000000..4b0113c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.processing.store.TablePage;
+
+/**
+ * It is for writing DataMap for one table
+ */
+public class DataMapWriterListener {
+
+ private static final LogService LOG = LogServiceFactory.getLogService(
+ DataMapWriterListener.class.getCanonicalName());
+
+ // list indexed column name -> list of data map writer
+ private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
+
+ /**
+ * register all datamap writer for specified table and segment
+ */
+ public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+ List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+ if (tableDataMaps != null) {
+ for (TableDataMap tableDataMap : tableDataMaps) {
+ DataMapFactory factory = tableDataMap.getDataMapFactory();
+ register(factory, segmentId);
+ }
+ }
+ }
+
+ /**
+ * Register a DataMapWriter
+ */
+ private void register(DataMapFactory factory, String segmentId) {
+ assert (factory != null);
+ assert (segmentId != null);
+ DataMapMeta meta = factory.getMeta();
+ if (meta == null) {
+ // if data map does not have meta, no need to register
+ return;
+ }
+ List<String> columns = factory.getMeta().getIndexedColumns();
+ List<DataMapWriter> writers = registry.get(columns);
+ DataMapWriter writer = factory.createWriter(segmentId);
+ if (writers != null) {
+ writers.add(writer);
+ } else {
+ writers = new ArrayList<>();
+ writers.add(writer);
+ registry.put(columns, writers);
+ }
+ LOG.info("DataMapWriter " + writer + " added");
+ }
+
+ public void onBlockStart(String blockId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockStart(blockId);
+ }
+ }
+ }
+
+ public void onBlockEnd(String blockId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockEnd(blockId);
+ }
+ }
+ }
+
+ public void onBlockletStart(int blockletId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockletStart(blockletId);
+ }
+ }
+ }
+
+ public void onBlockletEnd(int blockletId) {
+ for (List<DataMapWriter> writers : registry.values()) {
+ for (DataMapWriter writer : writers) {
+ writer.onBlockletEnd(blockletId);
+ }
+ }
+ }
+
+ /**
+ * Pick corresponding column pages and add to all registered datamap
+ *
+ * @param pageId sequence number of page, start from 0
+ * @param tablePage page data
+ */
+ public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
+ Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
+ for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
+ List<String> indexedColumns = entry.getKey();
+ ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
+ for (int i = 0; i < indexedColumns.size(); i++) {
+ pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
+ }
+ List<DataMapWriter> writers = entry.getValue();
+ for (DataMapWriter writer : writers) {
+ writer.onPageAdded(blockletId, pageId, pages);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index f5fdd4d..02ceb06 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
new file mode 100644
index 0000000..6b54d2d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datatypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+
+/**
+ * Generic DataType interface which will be used while data loading for complex types like Array &
+ * Struct
+ */
+public interface GenericDataType<T> {
+
+ /**
+ * @return name of the column
+ */
+ String getName();
+
+ /**
+ * @return - columns parent name
+ */
+ String getParentname();
+
+ /**
+ * @param children - To add children dimension for parent complex type
+ */
+ void addChildren(GenericDataType children);
+
+ /**
+ * @param primitiveChild - Returns all primitive type columns in complex type
+ */
+ void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
+
+ /**
+ * writes to byte stream
+ * @param dataOutputStream
+ * @throws IOException
+ */
+ void writeByteArray(T input, DataOutputStream dataOutputStream)
+ throws IOException, DictionaryGenerationException;
+
+ /**
+ * @return surrogateIndex for primitive column in complex type
+ */
+ int getSurrogateIndex();
+
+ /**
+ * @param surrIndex - surrogate index of primitive column in complex type
+ */
+ void setSurrogateIndex(int surrIndex);
+
+ /**
+ * converts integer surrogate to bit packed surrogate value
+ * @param byteArrayInput
+ * @param dataOutputStream
+ * @param generator
+ * @throws IOException
+ * @throws KeyGenException
+ */
+ void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+ KeyGenerator[] generator) throws IOException, KeyGenException;
+
+ /**
+ * @return columns count of each complex type
+ */
+ int getColsCount();
+
+ /**
+ * @return column uuid string
+ */
+ String getColumnId();
+
+ /**
+ * set array index to be referred while creating metadata column
+ * @param outputArrayIndex
+ */
+ void setOutputArrayIndex(int outputArrayIndex);
+
+ /**
+ * @return array index count of metadata column
+ */
+ int getMaxOutputArrayIndex();
+
+ /**
+ * Split byte array into complex metadata column and primitive column
+ * @param columnsArray
+ * @param inputArray
+ */
+ void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
+
+ /**
+ * @return current read row count
+ */
+ int getDataCounter();
+
+ /**
+ * fill agg key block including complex types
+ * @param aggKeyBlockWithComplex
+ * @param aggKeyBlock
+ */
+ void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
+
+ /**
+ * fill block key size including complex types
+ * @param blockKeySizeWithComplex
+ * @param primitiveBlockKeySize
+ */
+ void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
+
+ /**
+ * fill cardinality value including complex types
+ * @param dimCardWithComplex
+ * @param maxSurrogateKeyArray
+ */
+ void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
+
+ /**
+ * Fill the cardinality of the primitive datatypes
+ * @param dimCardWithComplex
+ */
+ void fillCardinality(List<Integer> dimCardWithComplex);
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index a24a324..e7e48e9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 94ee9f6..a61144e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
index d5730a2..8feea6a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
@@ -21,8 +21,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index d30582b..e9b0a78 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -23,13 +23,13 @@ import java.util.Map;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.newflow.DataField;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 9c48af7..a716340 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -35,10 +35,8 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -51,6 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
import org.apache.carbondata.processing.store.file.FileManager;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -74,6 +73,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* data writer
*/
private CarbonFactDataWriter dataWriter;
+
/**
* File manager
*/
@@ -87,11 +87,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
* once this size of input is reached
*/
- private int blockletSize;
- /**
- * keyGenerator
- */
- private ColumnarSplitter columnarSplitter;
+ private int pageSize;
/**
* keyBlockHolder
*/
@@ -120,7 +116,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* a private class that will hold the data for blocklets
*/
- private BlockletDataHolder blockletDataHolder;
+ private TablePageList tablePageList;
/**
* number of cores configured
*/
@@ -146,8 +142,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private ColumnarFormatVersion version;
- private SortScopeOptions.SortScope sortScope;
-
/**
* CarbonFactDataHandler constructor
*/
@@ -202,11 +196,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
noInvertedIdxCol += (cd.getColName() + ",");
}
}
+
LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol);
}
private void initParameters(CarbonFactDataHandlerModel model) {
- this.sortScope = model.getSortScope();
+ SortScopeOptions.SortScope sortScope = model.getSortScope();
this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
//TODO need to pass carbon table identifier to metadata
@@ -254,10 +249,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
consumerExecutorService = Executors.newFixedThreadPool(1);
consumerExecutorServiceTaskList = new ArrayList<>(1);
semaphore = new Semaphore(numberOfCores);
- blockletDataHolder = new BlockletDataHolder();
+ tablePageList = new TablePageList();
// Start the consumer which will take each blocklet/page in order and write to a file
- Consumer consumer = new Consumer(blockletDataHolder);
+ Consumer consumer = new Consumer(tablePageList);
consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
}
@@ -314,20 +309,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.entryCount++;
// if entry count reaches to leaf node size then we are ready to write
// this to leaf node file and update the intermediate files
- if (this.entryCount == this.blockletSize) {
+ if (this.entryCount == this.pageSize) {
try {
semaphore.acquire();
producerExecutorServiceTaskList.add(
producerExecutorService.submit(
- new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
+ new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, false)
)
);
blockletProcessingCount.incrementAndGet();
// set the entry count to zero
processedDataCount += entryCount;
LOGGER.info("Total Number Of records added to store: " + processedDataCount);
- dataRows = new ArrayList<>(this.blockletSize);
+ dataRows = new ArrayList<>(this.pageSize);
this.entryCount = 0;
} catch (InterruptedException e) {
LOGGER.error(e, e.getMessage());
@@ -339,10 +334,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* generate the EncodedTablePage from the input rows (one page in case of V3 format)
*/
- private EncodedTablePage processDataRows(List<CarbonRow> dataRows)
+ private TablePage processDataRows(List<CarbonRow> dataRows)
throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
if (dataRows.size() == 0) {
- return EncodedTablePage.newEmptyInstance();
+ return new TablePage(model, 0);
}
TablePage tablePage = new TablePage(model, dataRows.size());
int rowId = 0;
@@ -352,11 +347,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
tablePage.addRow(rowId++, row);
}
- EncodedTablePage encoded = tablePage.encode();
- tablePage.freeMemory();
+ tablePage.encode();
LOGGER.info("Number Of records processed: " + dataRows.size());
- return encoded;
+ return tablePage;
}
/**
@@ -370,7 +364,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
try {
semaphore.acquire();
producerExecutorServiceTaskList.add(producerExecutorService
- .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
+ .submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true)));
blockletProcessingCount.incrementAndGet();
processedDataCount += entryCount;
closeWriterExecutionService(producerExecutorService);
@@ -471,19 +465,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private void setWritingConfiguration() throws CarbonDataWriterException {
// get blocklet size
- this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+ this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
if (version == ColumnarFormatVersion.V3) {
- this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+ this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
}
- LOGGER.info("Number of rows per column blocklet " + blockletSize);
- dataRows = new ArrayList<>(this.blockletSize);
+ LOGGER.info("Number of rows per column blocklet " + pageSize);
+ dataRows = new ArrayList<>(this.pageSize);
int dimSet =
Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
- // if atleast one dimension is present then initialize column splitter otherwise null
+ // if at least one dimension is present then initialize column splitter otherwise null
int noOfColStore = colGrpModel.getNoOfColumnStore();
int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()];
@@ -494,16 +488,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
//row store will be in single column store
//e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension
//than below splitter will return column as {0,1,2}{3}{4}{5}
- this.columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
+ ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
this.keyBlockHolder =
- new CarbonKeyBlockHolder[this.columnarSplitter.getBlockKeySize().length];
+ new CarbonKeyBlockHolder[columnarSplitter.getBlockKeySize().length];
} else {
this.keyBlockHolder = new CarbonKeyBlockHolder[0];
}
for (int i = 0; i < keyBlockHolder.length; i++) {
- this.keyBlockHolder[i] = new CarbonKeyBlockHolder(blockletSize);
+ this.keyBlockHolder[i] = new CarbonKeyBlockHolder(pageSize);
this.keyBlockHolder[i].resetCounter();
}
@@ -535,7 +529,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
.getBlockKeySize());
System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
blockKeySize.length - noOfColStore);
- this.dataWriter = getFactDataWriter(keyBlockSize);
+ this.dataWriter = getFactDataWriter();
this.dataWriter.setIsNoDictionary(isNoDictionary);
// initialize the channel;
this.dataWriter.initializeWriter();
@@ -574,21 +568,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* Below method will be used to get the fact data writer instance
*
- * @param keyBlockSize
* @return data writer instance
*/
- private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
+ private CarbonFactDataWriter<?> getFactDataWriter() {
return CarbonDataWriterFactory.getInstance()
- .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
+ .getFactDataWriter(version, getDataWriterVo());
}
/**
* Below method will be used to get the writer vo
*
- * @param keyBlockSize size of each key block
* @return data writer vo object
*/
- private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) {
+ private CarbonDataWriterVo getDataWriterVo() {
CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo();
carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
@@ -608,6 +600,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
carbonDataWriterVo.setBucketNumber(model.getBucketId());
carbonDataWriterVo.setTaskExtension(model.getTaskExtension());
carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
+ carbonDataWriterVo.setListener(model.getDataMapWriterlistener());
return carbonDataWriterVo;
}
@@ -644,14 +637,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
/**
- * This class will hold the holder objects and manage producer and consumer for reading
- * and writing the blocklet data
+ * This class will hold the table page data
*/
- private final class BlockletDataHolder {
+ private final class TablePageList {
/**
- * array of blocklet data holder objects
+ * array of table page added by Producer and get by Consumer
*/
- private EncodedTablePage[] encodedTablePages;
+ private TablePage[] tablePages;
/**
* flag to check whether the producer has completed processing for holder
* object which is required to be picked form an index
@@ -662,8 +654,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private int currentIndex;
- private BlockletDataHolder() {
- encodedTablePages = new EncodedTablePage[numberOfCores];
+ private TablePageList() {
+ tablePages = new TablePage[numberOfCores];
available = new AtomicBoolean(false);
}
@@ -671,32 +663,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @return a node holder object
* @throws InterruptedException if consumer thread is interrupted
*/
- public synchronized EncodedTablePage get() throws InterruptedException {
- EncodedTablePage encodedTablePage = encodedTablePages[currentIndex];
+ public synchronized TablePage get() throws InterruptedException {
+ TablePage tablePage = tablePages[currentIndex];
// if node holder is null means producer thread processing the data which has to
// be inserted at this current index has not completed yet
- if (null == encodedTablePage && !processingComplete) {
+ if (null == tablePage && !processingComplete) {
available.set(false);
}
while (!available.get()) {
wait();
}
- encodedTablePage = encodedTablePages[currentIndex];
- encodedTablePages[currentIndex] = null;
+ tablePage = tablePages[currentIndex];
+ tablePages[currentIndex] = null;
currentIndex++;
// reset current index when it reaches length of node holder array
- if (currentIndex >= encodedTablePages.length) {
+ if (currentIndex >= tablePages.length) {
currentIndex = 0;
}
- return encodedTablePage;
+ return tablePage;
}
/**
* @param encodedTablePage
* @param index
*/
- public synchronized void put(EncodedTablePage encodedTablePage, int index) {
- encodedTablePages[index] = encodedTablePage;
+ public synchronized void put(TablePage tablePage, int index) {
+ tablePages[index] = tablePage;
// notify the consumer thread when index at which object is to be inserted
// becomes equal to current index from where data has to be picked for writing
if (index == currentIndex) {
@@ -711,16 +703,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private final class Producer implements Callable<Void> {
- private BlockletDataHolder blockletDataHolder;
+ private TablePageList tablePageList;
private List<CarbonRow> dataRows;
- private int sequenceNumber;
+ private int pageId;
private boolean isLastPage;
- private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
- int sequenceNumber, boolean isLastPage) {
- this.blockletDataHolder = blockletDataHolder;
+ private Producer(TablePageList tablePageList, List<CarbonRow> dataRows,
+ int pageId, boolean isLastPage) {
+ this.tablePageList = tablePageList;
this.dataRows = dataRows;
- this.sequenceNumber = sequenceNumber;
+ this.pageId = pageId;
this.isLastPage = isLastPage;
}
@@ -732,11 +724,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
@Override public Void call() throws Exception {
try {
- EncodedTablePage encodedTablePage = processDataRows(dataRows);
- encodedTablePage.setIsLastPage(isLastPage);
+ TablePage tablePage = processDataRows(dataRows);
+ tablePage.setIsLastPage(isLastPage);
// insert the object in array according to sequence number
- int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
- blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray);
+ int indexInNodeHolderArray = (pageId - 1) % numberOfCores;
+ tablePageList.put(tablePage, indexInNodeHolderArray);
return null;
} catch (Throwable throwable) {
LOGGER.error(throwable, "Error in producer");
@@ -752,10 +744,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private final class Consumer implements Callable<Void> {
- private BlockletDataHolder blockletDataHolder;
+ private TablePageList tablePageList;
- private Consumer(BlockletDataHolder blockletDataHolder) {
- this.blockletDataHolder = blockletDataHolder;
+ private Consumer(TablePageList tablePageList) {
+ this.tablePageList = tablePageList;
}
/**
@@ -766,11 +758,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
@Override public Void call() throws Exception {
while (!processingComplete || blockletProcessingCount.get() > 0) {
- EncodedTablePage encodedTablePage = null;
+ TablePage tablePage = null;
try {
- encodedTablePage = blockletDataHolder.get();
- if (null != encodedTablePage) {
- dataWriter.writeTablePage(encodedTablePage);
+ tablePage = tablePageList.get();
+ if (null != tablePage) {
+ dataWriter.writeTablePage(tablePage);
+ tablePage.freeMemory();
}
blockletProcessingCount.decrementAndGet();
} catch (Throwable throwable) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 51ec84b..c059030 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -39,6 +38,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
@@ -159,6 +160,8 @@ public class CarbonFactDataHandlerModel {
private SortScopeOptions.SortScope sortScope;
+ private DataMapWriterListener dataMapWriterlistener;
+
/**
* Create the model using @{@link CarbonDataLoadConfiguration}
*/
@@ -254,6 +257,11 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.taskExtension = taskExtension;
carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+
+ DataMapWriterListener listener = new DataMapWriterListener();
+ listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId());
+ carbonFactDataHandlerModel.dataMapWriterlistener = listener;
+
return carbonFactDataHandlerModel;
}
@@ -557,5 +565,9 @@ public class CarbonFactDataHandlerModel {
public SortScopeOptions.SortScope getSortScope() {
return sortScope;
}
+
+ public DataMapWriterListener getDataMapWriterlistener() {
+ return dataMapWriterlistener;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 03f3e5e..d2363f1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.DimensionType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -44,9 +44,8 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.spark.sql.types.Decimal;
@@ -73,7 +72,12 @@ public class TablePage {
private TablePageKey key;
- private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+ private EncodedTablePage encodedTablePage;
+
+ private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+ // true if it is last page of all input rows
+ private boolean isLastPage;
TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
this.model = model;
@@ -240,14 +244,16 @@ public class TablePage {
return output;
}
- EncodedTablePage encode() throws KeyGenException, MemoryException, IOException {
+ void encode() throws KeyGenException, MemoryException, IOException {
// encode dimensions and measure
EncodedDimensionPage[] dimensions = encodeAndCompressDimensions();
EncodedMeasurePage[] measures = encodeAndCompressMeasures();
- return EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
+ this.encodedTablePage = EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
}
- private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+ public EncodedTablePage getEncodedTablePage() {
+ return encodedTablePage;
+ }
// apply measure and set encodedData in `encodedData`
private EncodedMeasurePage[] encodeAndCompressMeasures()
@@ -301,6 +307,52 @@ public class TablePage {
encodedDimensions.addAll(encodedComplexDimenions);
return encodedDimensions.toArray(new EncodedDimensionPage[encodedDimensions.size()]);
}
+
+ /**
+ * return column page of specified column name
+ */
+ public ColumnPage getColumnPage(String columnName) {
+ int dictDimensionIndex = -1;
+ int noDictDimensionIndex = -1;
+ ColumnPage page = null;
+ TableSpec spec = model.getTableSpec();
+ int numDimensions = spec.getNumDimensions();
+ for (int i = 0; i < numDimensions; i++) {
+ DimensionType type = spec.getDimensionSpec(i).getDimensionType();
+ if ((type == DimensionType.GLOBAL_DICTIONARY) || (type == DimensionType.DIRECT_DICTIONARY)) {
+ page = dictDimensionPages[++dictDimensionIndex];
+ } else if (type == DimensionType.PLAIN_VALUE) {
+ page = noDictDimensionPages[++noDictDimensionIndex];
+ } else {
+ // do not support datamap on complex column
+ continue;
+ }
+ String fieldName = spec.getDimensionSpec(i).getFieldName();
+ if (fieldName.equalsIgnoreCase(columnName)) {
+ return page;
+ }
+ }
+ int numMeasures = spec.getNumMeasures();
+ for (int i = 0; i < numMeasures; i++) {
+ String fieldName = spec.getMeasureSpec(i).getFieldName();
+ if (fieldName.equalsIgnoreCase(columnName)) {
+ return measurePage[i];
+ }
+ }
+ throw new IllegalArgumentException("DataMap: must have '" + columnName + "' column in schema");
+ }
+
+ public boolean isLastPage() {
+ return isLastPage;
+ }
+
+ public void setIsLastPage(boolean isWriteAll) {
+ this.isLastPage = isWriteAll;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a34ed01..bcc0112 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -24,7 +24,6 @@ import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-//import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
@@ -44,17 +43,14 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonMergerUtil;
@@ -66,6 +62,7 @@ import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.file.FileData;
import org.apache.commons.lang3.ArrayUtils;
@@ -97,11 +94,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
protected String carbonDataFileTempPath;
/**
- * The name of carbonData file
+ * The name of carbonData file (blockId)
*/
protected String carbonDataFileName;
/**
+ * The sequence number of blocklet inside one block
+ */
+ protected int blockletId = 0;
+
+ /**
+ * The sequence number of page inside one blocklet
+ */
+ protected int pageId = 0;
+
+ /**
* Local cardinality for the segment
*/
protected int[] localCardinality;
@@ -132,7 +139,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
/**
* data block size for one carbon data file
*/
- private long dataBlockSize;
+ private long blockSizeThreshold;
/**
* file size at any given point
*/
@@ -152,6 +159,11 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
+ /**
+ * listener to write data map
+ */
+ protected DataMapWriterListener listener;
+
public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
this.dataWriterVo = dataWriterVo;
this.blockletInfoList =
@@ -163,22 +175,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.fileSizeInBytes =
(long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
* CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
- /*
- size reserved in one file for writing block meta data. It will be in percentage
- */
+
+ // size reserved in one file for writing block meta data. It will be in percentage
int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
.getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
- this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
- LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + dataBlockSize);
+ this.blockSizeThreshold =
+ fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
+ LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
+ blockSizeThreshold);
this.executorService = Executors.newFixedThreadPool(1);
executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// in case of compaction we will pass the cardinality.
this.localCardinality = dataWriterVo.getColCardinality();
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo
- .getTableName());
+
//TODO: We should delete the levelmetadata file after reading here.
// so only data loading flow will need to read from cardinality file.
if (null == this.localCardinality) {
@@ -202,6 +213,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.dataChunksLength = new ArrayList<>();
blockletMetadata = new ArrayList<BlockletInfo3>();
blockletIndex = new ArrayList<>();
+ listener = dataWriterVo.getListener();
}
/**
@@ -241,18 +253,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
}
/**
- * This method will be used to update the file channel with new file; new
- * file will be created once existing file reached the file size limit This
+ * This method will be used to update the file channel with new file if exceeding block size
+ * threshold, new file will be created once existing file reached the file size limit This
* method will first check whether existing file size is exceeded the file
* size limit if yes then write the leaf metadata to file then set the
* current file size to 0 close the existing file channel get the new file
* name and get the channel for new file
*
- * @param blockletDataSize data size of one block
+ * @param blockletSizeToBeAdded data size of one block
* @throws CarbonDataWriterException if any problem
*/
- protected void updateBlockletFileChannel(long blockletDataSize) throws CarbonDataWriterException {
- if ((currentFileSize + blockletDataSize) >= dataBlockSize && currentFileSize != 0) {
+ protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
+ throws CarbonDataWriterException {
+ if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
// set the current file size to zero
LOGGER.info("Writing data to file as max file size reached for file: "
+ carbonDataFileTempPath + " .Data block size: " + currentFileSize);
@@ -265,16 +278,42 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.dataChunksLength = new ArrayList<>();
this.blockletMetadata = new ArrayList<>();
this.blockletIndex = new ArrayList<>();
- CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
- // rename carbon data file from in progress status to actual
- renameCarbonDataFile();
- executorServiceSubmitList.add(executorService
- .submit(new CopyThread(this.carbonDataFileTempPath
- .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')))));
+ commitCurrentFile(false);
// initialize the new channel
initializeWriter();
}
- currentFileSize += blockletDataSize;
+ currentFileSize += blockletSizeToBeAdded;
+ }
+
+ private void notifyDataMapBlockStart() {
+ if (listener != null) {
+ listener.onBlockStart(carbonDataFileName);
+ }
+ }
+
+ private void notifyDataMapBlockEnd() {
+ if (listener != null) {
+ listener.onBlockEnd(carbonDataFileName);
+ }
+ blockletId = 0;
+ }
+
+ /**
+ * Finish writing current file. It will flush stream, copy and rename temp file to final file
+ * @param copyInCurrentThread set to false if want to do data copy in a new thread
+ */
+ protected void commitCurrentFile(boolean copyInCurrentThread) {
+ notifyDataMapBlockEnd();
+ CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+ // rename carbon data file from in progress status to actual
+ renameCarbonDataFile();
+ String fileName = this.carbonDataFileTempPath.substring(0,
+ this.carbonDataFileTempPath.lastIndexOf('.'));
+ if (copyInCurrentThread) {
+ copyCarbonDataFileToCarbonStorePath(fileName);
+ } else {
+ executorServiceSubmitList.add(executorService.submit(new CopyThread(fileName)));
+ }
}
/**
@@ -310,6 +349,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
throw new CarbonDataWriterException("Problem while getting the FileChannel for Leaf File",
fileNotFoundException);
}
+ notifyDataMapBlockStart();
}
private int initFileCount() {
@@ -433,18 +473,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @throws CarbonDataWriterException
*/
public void closeWriter() throws CarbonDataWriterException {
- CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
if (this.blockletInfoList.size() > 0) {
- renameCarbonDataFile();
- copyCarbonDataFileToCarbonStorePath(
- this.carbonDataFileTempPath
- .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+ commitCurrentFile(true);
try {
writeIndexFile();
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while writing the index file", e);
}
}
+ CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
closeExecutorService();
}
@@ -590,17 +627,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
}
/**
- * This method will be used to write leaf data to file
- * file format
- * <key><measure1><measure2>....
- *
- * @throws CarbonDataWriterException
- * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
- */
- public abstract void writeTablePage(EncodedTablePage encodedTablePage)
- throws CarbonDataWriterException;
-
- /**
* Below method will be used to update the min or max value
* by removing the length from it
*
@@ -608,10 +634,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
return valueWithLength;
-// ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
-// byte[] actualValue = new byte[buffer.getShort()];
-// buffer.get(actualValue);
-// return actualValue;
}
/**
@@ -640,5 +662,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
copyCarbonDataFileToCarbonStorePath(fileName);
return null;
}
+
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 225e031..26fff09 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -20,6 +20,7 @@ import java.util.List;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -64,6 +65,8 @@ public class CarbonDataWriterVo {
private int taskExtension;
+ private DataMapWriterListener listener;
+
/**
* @return the storeLocation
*/
@@ -303,4 +306,12 @@ public class CarbonDataWriterVo {
public void setTaskExtension(int taskExtension) {
this.taskExtension = taskExtension;
}
+
+ public void setListener(DataMapWriterListener listener) {
+ this.listener = listener;
+ }
+
+ public DataMapWriterListener getListener() {
+ return listener;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index f194f74..3b26b7c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -18,14 +18,15 @@
package org.apache.carbondata.processing.store.writer;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
public interface CarbonFactDataWriter<T> {
/**
* write a encoded table page
+ * @param tablePage
*/
- void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException;
+ void writeTablePage(TablePage tablePage) throws CarbonDataWriterException;
/**
* Below method will be used to write the leaf meta data to file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index 0f1b52b..f849e21 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.writer.CarbonFooterWriter;
import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -199,14 +200,14 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
return holder;
}
- @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+ @Override public void writeTablePage(TablePage tablePage)
throws CarbonDataWriterException {
- if (encodedTablePage.getPageSize() == 0) {
+ if (tablePage.getPageSize() == 0) {
return;
}
- long blockletDataSize = encodedTablePage.getEncodedSize();
- updateBlockletFileChannel(blockletDataSize);
- NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
+ long blockletDataSize = tablePage.getEncodedTablePage().getEncodedSize();
+ createNewFileIfReachThreshold(blockletDataSize);
+ NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
// write data to file and get its offset
long offset = writeDataToFile(nodeHolder, fileChannel);
// get the blocklet info for currently added blocklet
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index e19a5ce..3f49a7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.writer.CarbonFooterWriter;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
@@ -63,19 +64,19 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
/**
* Below method will be used to write the data to carbon data file
*
- * @param encodedTablePage
+ * @param tablePage
* @throws CarbonDataWriterException any problem in writing operation
*/
- @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+ @Override public void writeTablePage(TablePage tablePage)
throws CarbonDataWriterException {
- NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
- if (encodedTablePage.getPageSize() == 0) {
+ NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
+ if (tablePage.getPageSize() == 0) {
return;
}
// size to calculate the size of the blocklet
int size = 0;
// get the blocklet info object
- BlockletInfoColumnar blockletInfo = getBlockletInfo(encodedTablePage, 0);
+ BlockletInfoColumnar blockletInfo = getBlockletInfo(tablePage.getEncodedTablePage(), 0);
List<DataChunk2> datachunks = null;
try {
@@ -105,7 +106,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + size;
// if size of the file already reached threshold size then create a new file and get the file
// channel object
- updateBlockletFileChannel(blockletDataSize);
+ createNewFileIfReachThreshold(blockletDataSize);
// writer the version header in the file if current file size is zero
// this is done so carbondata file can be read separately
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
new file mode 100644
index 0000000..68aee95
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
+
+public class BlockletDataHolder {
+ private List<EncodedTablePage> encodedTablePage;
+ private List<TablePage> rawTablePages;
+ private long currentSize;
+
+ public BlockletDataHolder() {
+ this.encodedTablePage = new ArrayList<>();
+ this.rawTablePages = new ArrayList<>();
+ }
+
+ public void clear() {
+ encodedTablePage.clear();
+ rawTablePages.clear();
+ currentSize = 0;
+ }
+
+ public void addPage(TablePage rawTablePage) {
+ EncodedTablePage encodedTablePage = rawTablePage.getEncodedTablePage();
+ this.encodedTablePage.add(encodedTablePage);
+ this.rawTablePages.add(rawTablePage);
+ currentSize += encodedTablePage.getEncodedSize();
+ }
+
+ public long getSize() {
+ // increasing it by 15 percent for data chunk 3 of each column each page
+ return currentSize + ((currentSize * 15) / 100);
+ }
+
+ public int getNumberOfPagesAdded() {
+ return encodedTablePage.size();
+ }
+
+ public int getTotalRows() {
+ int rows = 0;
+ for (EncodedTablePage nh : encodedTablePage) {
+ rows += nh.getPageSize();
+ }
+ return rows;
+ }
+
+ public List<EncodedTablePage> getEncodedTablePages() {
+ return encodedTablePage;
+ }
+
+ public List<TablePage> getRawTablePages() {
+ return rawTablePages;
+ }
+}