You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/06 08:50:19 UTC
[4/4] carbondata git commit: [CARBONDATA-2415] Support Refresh
DataMap command for all Index datamap
[CARBONDATA-2415] Support Refresh DataMap command for all Index datamap
Refactor DataMapWriter interface to accept row instead of column page when adding data. This refactory need to modify a lot of test case files
Add REFRESH DATAMAP support for all index datamap including Lucene and Bloom
Make IndexDataMapRefreshRDD generic for all index datamap
For all index datamap, added a unified DMPROPERTIES called INDEX_COLUMNS to specify the column name list for indexing
This closes #2254
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9db662a2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9db662a2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9db662a2
Branch: refs/heads/master
Commit: 9db662a2d352e379b42495636f27f7c10bf49f6c
Parents: f2fb068
Author: Jacky Li <ja...@qq.com>
Authored: Tue May 1 14:51:08 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sun May 6 14:20:01 2018 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 6 +
.../carbondata/core/datamap/DataMapChooser.java | 8 +-
.../carbondata/core/datamap/DataMapMeta.java | 21 +-
.../core/datamap/DataMapProvider.java | 53 ++--
.../core/datamap/DataMapRegistry.java | 44 ++-
.../core/datamap/DataMapStoreManager.java | 27 +-
.../core/datamap/IndexDataMapProvider.java | 138 --------
.../apache/carbondata/core/datamap/Segment.java | 4 +
.../carbondata/core/datamap/TableDataMap.java | 4 +-
.../core/datamap/dev/DataMapFactory.java | 76 ++++-
.../core/datamap/dev/DataMapRefresher.java | 36 +++
.../core/datamap/dev/DataMapWriter.java | 54 ++--
.../cgdatamap/CoarseGrainDataMapFactory.java | 11 +-
.../dev/expr/DataMapExprWrapperImpl.java | 2 +-
.../dev/fgdatamap/FineGrainBlocklet.java | 5 +-
.../dev/fgdatamap/FineGrainDataMapFactory.java | 12 +-
.../datamap/status/DataMapStatusManager.java | 2 +-
.../carbondata/core/indexstore/Blocklet.java | 18 +-
.../core/indexstore/ExtendedBlocklet.java | 6 +-
.../blockletindex/BlockletDataMapFactory.java | 14 +-
.../schema/datamap/DataMapClassProvider.java | 17 +-
.../core/metadata/schema/table/CarbonTable.java | 30 +-
.../metadata/schema/table/DataMapSchema.java | 21 ++
.../scanner/impl/BlockletFilterScanner.java | 3 +-
.../core/util/path/CarbonTablePath.java | 34 +-
.../datamap/bloom/BloomCoarseGrainDataMap.java | 8 +-
.../bloom/BloomCoarseGrainDataMapFactory.java | 82 ++---
.../carbondata/datamap/bloom/BloomDMModel.java | 3 +
.../datamap/bloom/BloomDataMapRefresher.java | 89 ++++++
.../datamap/bloom/BloomDataMapWriter.java | 199 ++++++------
.../datamap/examples/MinMaxDataWriter.java | 85 ++---
.../examples/MinMaxIndexDataMapFactory.java | 28 +-
.../lucene/LuceneCoarseGrainDataMapFactory.java | 13 +-
.../lucene/LuceneDataMapFactoryBase.java | 110 +++----
.../datamap/lucene/LuceneDataMapRefresher.java | 224 +++++++++++++
.../datamap/lucene/LuceneDataMapWriter.java | 204 +++++-------
.../datamap/lucene/LuceneFineGrainDataMap.java | 6 +-
.../lucene/LuceneFineGrainDataMapFactory.java | 13 +-
.../lucene/LuceneIndexRefreshBuilder.java | 220 -------------
.../hadoop/api/CarbonInputFormat.java | 4 +-
.../cluster/sdv/generated/LuceneTestCase.scala | 21 +-
.../lucene/LuceneCoarseGrainDataMapSuite.scala | 2 +-
.../lucene/LuceneFineGrainDataMapSuite.scala | 84 ++---
.../testsuite/datamap/CGDataMapTestCase.scala | 100 +++---
.../testsuite/datamap/DataMapWriterSuite.scala | 78 +++--
.../testsuite/datamap/FGDataMapTestCase.scala | 66 ++--
.../testsuite/datamap/TestDataMapStatus.scala | 70 ++--
.../TestInsertAndOtherCommandConcurrent.scala | 45 +--
.../carbondata/datamap/DataMapManager.java | 13 +-
.../datamap/IndexDataMapProvider.java | 147 +++++++++
.../datamap/PreAggregateDataMapProvider.java | 30 +-
.../datamap/TimeseriesDataMapProvider.java | 10 +-
.../datamap/IndexDataMapRefreshRDD.scala | 317 +++++++++++++++++++
.../lucene/LuceneDataMapRefreshRDD.scala | 299 -----------------
.../datamap/CarbonCreateDataMapCommand.scala | 77 +++--
.../datamap/CarbonDataMapRefreshCommand.scala | 17 +-
.../datamap/CarbonDropDataMapCommand.scala | 14 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 7 +-
.../bloom/BloomCoarseGrainDataMapSuite.scala | 122 +++++--
.../datamap/DataMapWriterException.java | 24 ++
.../datamap/DataMapWriterListener.java | 42 ++-
.../CarbonRowDataWriterProcessorStepImpl.java | 9 +-
.../store/CarbonFactDataHandlerModel.java | 24 +-
.../store/writer/AbstractFactDataWriter.java | 3 +-
64 files changed, 1967 insertions(+), 1588 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 648f08e..f9bf220 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1720,6 +1720,12 @@ public final class CarbonCommonConstants {
*/
public static final String CARBON_LUCENE_COMPRESSION_MODE_DEFAULT = "speed";
+ /**
+ * DMPROPERTY for Index DataMap, like lucene, bloomfilter DataMap,
+ * to indicate a list of column name to be indexed
+ */
+ public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 57069b8..478254d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -279,6 +279,10 @@ public class DataMapChooser {
return null;
}
+ /**
+ * Return true if the input datamap contains the column that needed in
+ * specified expression
+ */
private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpressions,
Set<ExpressionType> expressionTypes) {
if (mapMeta.getOptimizedOperation().contains(ExpressionType.TEXT_MATCH) &&
@@ -291,8 +295,8 @@ public class DataMapChooser {
}
boolean contains = true;
for (ColumnExpression expression : columnExpressions) {
- if (!mapMeta.getIndexedColumns().contains(expression.getColumnName()) || !mapMeta
- .getOptimizedOperation().containsAll(expressionTypes)) {
+ if (!mapMeta.getIndexedColumnNames().contains(expression.getColumnName()) ||
+ !mapMeta.getOptimizedOperation().containsAll(expressionTypes)) {
contains = false;
break;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index cf51b11..adf85d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -21,8 +21,12 @@ import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Transformer;
+
/**
* Metadata of the datamap, set by DataMap developer
*/
@@ -31,16 +35,17 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
public class DataMapMeta {
private String dataMapName;
- private List<String> indexedColumns;
+ private List<CarbonColumn> indexedColumns;
private List<ExpressionType> optimizedOperation;
- public DataMapMeta(List<String> indexedColumns, List<ExpressionType> optimizedOperation) {
+ public DataMapMeta(List<CarbonColumn> indexedColumns,
+ List<ExpressionType> optimizedOperation) {
this.indexedColumns = indexedColumns;
this.optimizedOperation = optimizedOperation;
}
- public DataMapMeta(String dataMapName, List<String> indexedColumns,
+ public DataMapMeta(String dataMapName, List<CarbonColumn> indexedColumns,
List<ExpressionType> optimizedOperation) {
this(indexedColumns, optimizedOperation);
this.dataMapName = dataMapName;
@@ -50,10 +55,18 @@ public class DataMapMeta {
return dataMapName;
}
- public List<String> getIndexedColumns() {
+ public List<CarbonColumn> getIndexedColumns() {
return indexedColumns;
}
+ public List<String> getIndexedColumnNames() {
+ return (List<String>) CollectionUtils.collect(indexedColumns, new Transformer() {
+ @Override public Object transform(Object input) {
+ return ((CarbonColumn) input).getColName();
+ }
+ });
+ }
+
public List<ExpressionType> getOptimizedOperation() {
return optimizedOperation;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index 61dcfd1..775b912 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
@@ -42,47 +44,61 @@ import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
*
* <br>Currently CarbonData supports following provider:
* <ol>
- * <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li>
- * <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension
- * of the table </li>
- * <li> class name of {@link org.apache.carbondata.core.datamap.dev.DataMapFactory}
- * implementation: Developer can implement new type of DataMap by extending
- * {@link org.apache.carbondata.core.datamap.dev.DataMapFactory} </li>
+ * <li> preaggregate: pre-aggregate table of single table </li>
+ * <li> timeseries: pre-aggregate table based on time dimension of the table </li>
+ * <li> lucene: index backed by Apache Lucene </li>
+ * <li> bloomfilter: index backed by Bloom Filter </li>
* </ol>
*
* @since 1.4.0
*/
@InterfaceAudience.Internal
-public interface DataMapProvider {
+public abstract class DataMapProvider {
+
+ private CarbonTable mainTable;
+ private DataMapSchema dataMapSchema;
+
+ public DataMapProvider(CarbonTable mainTable, DataMapSchema dataMapSchema) {
+ this.mainTable = mainTable;
+ this.dataMapSchema = dataMapSchema;
+ }
+
+ protected final CarbonTable getMainTable() {
+ return mainTable;
+ }
+
+ protected final DataMapSchema getDataMapSchema() {
+ return dataMapSchema;
+ }
/**
* Initialize a datamap's metadata.
* This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
* Implementation should initialize metadata for datamap, like creating table
*/
- void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
- throws MalformedDataMapCommandException, IOException;
+ public abstract void initMeta(String ctasSqlStatement) throws MalformedDataMapCommandException,
+ IOException;
/**
* Initialize a datamap's data.
* This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
* Implementation should initialize data for datamap, like creating data folders
*/
- void initData(CarbonTable mainTable);
+ public abstract void initData();
/**
- * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String)}.
+ * Opposite operation of {@link #initMeta(String)}.
* This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
* Implementation should clean all meta for the datamap
*/
- void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException;
+ public abstract void cleanMeta() throws IOException;
/**
- * Opposite operation of {@link #initData(CarbonTable)}.
+ * Opposite operation of {@link #initData()}.
* This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
* Implementation should clean all data for the datamap
*/
- void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema);
+ public abstract void cleanData();
/**
* Rebuild the datamap by loading all existing data from mainTable
@@ -90,19 +106,20 @@ public interface DataMapProvider {
* 1. after datamap creation and if `autoRefreshDataMap` is set to true
* 2. user manually trigger refresh datamap command
*/
- void rebuild(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException;
+ public abstract void rebuild() throws IOException, NoSuchDataMapException;
/**
* Build the datamap incrementally by loading specified segment data
* This is called when user manually trigger refresh datamap
*/
- void incrementalBuild(CarbonTable mainTable, DataMapSchema dataMapSchema, String[] segmentIds)
- throws IOException;
+ public abstract void incrementalBuild(String[] segmentIds) throws IOException;
/**
* Provide the datamap catalog instance or null if this datamap not required to rewrite
* the query.
*/
- DataMapCatalog createDataMapCatalog();
+ public abstract DataMapCatalog createDataMapCatalog();
+
+ public abstract DataMapFactory getDataMapFactory();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
index 1b6782a..8c3640a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java
@@ -17,12 +17,20 @@
package org.apache.carbondata.core.datamap;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
/**
* Developer can register a datamap implementation with a short name.
@@ -33,7 +41,7 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
* USING 'short-name-of-the-datamap'
* }
* otherwise, user should use the class name of the datamap implementation to create the datamap
- * (subclass of {@link org.apache.carbondata.core.datamap.dev.DataMapFactory})
+ * (subclass of {@link DataMapFactory})
* <p>
* {@code
* CREATE DATAMAP dm ON TABLE table
@@ -45,14 +53,44 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
public class DataMapRegistry {
private static Map<String, String> shortNameToClassName = new ConcurrentHashMap<>();
- public static void registerDataMap(String datamapClassName, String shortName) {
+ private static void registerDataMap(String datamapClassName, String shortName) {
Objects.requireNonNull(datamapClassName);
Objects.requireNonNull(shortName);
shortNameToClassName.put(shortName, datamapClassName);
}
- public static String getDataMapClassName(String shortName) {
+ private static String getDataMapClassName(String shortName) {
Objects.requireNonNull(shortName);
return shortNameToClassName.get(shortName);
}
+
+ public static DataMapFactory<? extends DataMap> getDataMapFactoryByShortName(
+ CarbonTable table, DataMapSchema dataMapSchema) throws MalformedDataMapCommandException {
+ String providerName = dataMapSchema.getProviderName();
+ try {
+ registerDataMap(
+ DataMapClassProvider.getDataMapProviderOnName(providerName).getClassName(),
+ DataMapClassProvider.getDataMapProviderOnName(providerName).getShortName());
+ } catch (UnsupportedOperationException ex) {
+ throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
+ }
+ DataMapFactory<? extends DataMap> dataMapFactory;
+ String className = getDataMapClassName(providerName.toLowerCase());
+ if (className != null) {
+ try {
+ dataMapFactory = (DataMapFactory<? extends DataMap>)
+ Class.forName(className).getConstructors()[0].newInstance(table, dataMapSchema);
+ } catch (ClassNotFoundException ex) {
+ throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
+ } catch (InvocationTargetException ex) {
+ throw new MalformedDataMapCommandException(ex.getTargetException().getMessage());
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException ex) {
+ throw new MetadataProcessException(
+ "failed to create DataMap '" + providerName + "': " + ex.getMessage(), ex);
+ }
+ } else {
+ throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found");
+ }
+ return dataMapFactory;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a871d57..29a1106 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -87,7 +87,7 @@ public final class DataMapStoreManager {
List<TableDataMap> tableIndices = getAllVisibleDataMap(carbonTable);
if (tableIndices != null) {
for (TableDataMap dataMap : tableIndices) {
- if (mapType == dataMap.getDataMapFactory().getDataMapType()) {
+ if (mapType == dataMap.getDataMapFactory().getDataMapLevel()) {
dataMaps.add(dataMap);
}
}
@@ -262,22 +262,19 @@ public final class DataMapStoreManager {
}
/**
- * Return a new datamap instance for the given
- * @param dataMapSchema
- * @return
- * @throws MalformedDataMapCommandException
+ * Return a new datamap instance and registered in the store manager.
+ * The datamap is created using datamap name, datamap factory class and table identifier.
*/
- public DataMapFactory getDataMapFactoryClass(DataMapSchema dataMapSchema)
+ public DataMapFactory getDataMapFactoryClass(CarbonTable table, DataMapSchema dataMapSchema)
throws MalformedDataMapCommandException {
- DataMapFactory dataMapFactory;
try {
// try to create datamap by reflection to test whether it is a valid DataMapFactory class
- Class<? extends DataMapFactory> factoryClass =
- (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
- return factoryClass.newInstance();
+ return (DataMapFactory)
+ Class.forName(dataMapSchema.getProviderName()).getConstructors()[0]
+ .newInstance(table, dataMapSchema);
} catch (ClassNotFoundException e) {
// try to create DataMapClassProvider instance by taking providerName as short name
- return IndexDataMapProvider.getDataMapFactoryByShortName(dataMapSchema.getProviderName());
+ return DataMapRegistry.getDataMapFactoryByShortName(table, dataMapSchema);
} catch (Throwable e) {
throw new MetadataProcessException(
"failed to get DataMap factory for'" + dataMapSchema.getProviderName() + "'", e);
@@ -290,14 +287,13 @@ public final class DataMapStoreManager {
*/
// TODO: make it private
public TableDataMap createAndRegisterDataMap(CarbonTable table,
- DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
- DataMapFactory dataMapFactory = getDataMapFactoryClass(dataMapSchema);
+ DataMapSchema dataMapSchema) throws MalformedDataMapCommandException {
+ DataMapFactory dataMapFactory = getDataMapFactoryClass(table, dataMapSchema);
return registerDataMap(table, dataMapSchema, dataMapFactory);
}
public TableDataMap registerDataMap(CarbonTable table,
- DataMapSchema dataMapSchema, DataMapFactory dataMapFactory)
- throws IOException, MalformedDataMapCommandException {
+ DataMapSchema dataMapSchema, DataMapFactory dataMapFactory) {
String tableUniqueName = table.getCarbonTableIdentifier().getTableUniqueName();
// Just update the segmentRefreshMap with the table if not added.
getTableSegmentRefresher(table);
@@ -306,7 +302,6 @@ public final class DataMapStoreManager {
tableIndices = new ArrayList<>();
}
- dataMapFactory.init(table, dataMapSchema);
BlockletDetailsFetcher blockletDetailsFetcher;
SegmentPropertiesFetcher segmentPropertiesFetcher = null;
if (dataMapFactory instanceof BlockletDetailsFetcher) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
deleted file mode 100644
index a22bd0b..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datamap;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.exceptions.MetadataProcessException;
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
-
-@InterfaceAudience.Internal
-public class IndexDataMapProvider implements DataMapProvider {
-
- public IndexDataMapProvider() {
- }
-
- @Override
- public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
- throws MalformedDataMapCommandException, IOException {
- if (mainTable == null) {
- throw new MalformedDataMapCommandException(
- "Parent table is required to create index datamap");
- }
- ArrayList<RelationIdentifier> relationIdentifiers = new ArrayList<>();
- RelationIdentifier relationIdentifier =
- new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(),
- mainTable.getTableInfo().getFactTable().getTableId());
- relationIdentifiers.add(relationIdentifier);
- dataMapSchema.setRelationIdentifier(relationIdentifier);
- dataMapSchema.setParentTables(relationIdentifiers);
- DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
- DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory);
- DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema);
- }
-
- @Override
- public void initData(CarbonTable mainTable) {
- // Nothing is needed to do by default
- }
-
- @Override
- public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException {
- if (mainTable == null) {
- throw new UnsupportedOperationException("Table need to be specified in index datamaps");
- }
- DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName());
- }
-
- @Override
- public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema) {
- if (mainTable == null) {
- throw new UnsupportedOperationException("Table need to be specified in index datamaps");
- }
- DataMapStoreManager.getInstance().clearDataMap(
- mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName());
- }
-
- @Override
- public void rebuild(CarbonTable mainTable, DataMapSchema dataMapSchema) {
- // Nothing is needed to do by default
- }
-
- @Override public void incrementalBuild(CarbonTable mainTable, DataMapSchema dataMapSchema,
- String[] segmentIds) {
- throw new UnsupportedOperationException();
- }
-
- private DataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema)
- throws MalformedDataMapCommandException {
- DataMapFactory dataMapFactory;
- try {
- // try to create DataMapClassProvider instance by taking providerName as class name
- Class<? extends DataMapFactory> providerClass =
- (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName());
- dataMapFactory = providerClass.newInstance();
- } catch (ClassNotFoundException e) {
- // try to create DataMapClassProvider instance by taking providerName as short name
- dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getProviderName());
- } catch (Throwable e) {
- throw new MetadataProcessException(
- "failed to create DataMapClassProvider '" + dataMapSchema.getProviderName() + "'", e);
- }
- return dataMapFactory;
- }
-
- public static DataMapFactory getDataMapFactoryByShortName(String providerName)
- throws MalformedDataMapCommandException {
- try {
- DataMapRegistry.registerDataMap(
- DataMapClassProvider.getDataMapProviderOnName(providerName).getClassName(),
- DataMapClassProvider.getDataMapProviderOnName(providerName).getShortName());
- } catch (UnsupportedOperationException ex) {
- throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
- }
- DataMapFactory dataMapFactory;
- String className = DataMapRegistry.getDataMapClassName(providerName.toLowerCase());
- if (className != null) {
- try {
- Class<? extends DataMapFactory> datamapClass =
- (Class<? extends DataMapFactory>) Class.forName(className);
- dataMapFactory = datamapClass.newInstance();
- } catch (ClassNotFoundException ex) {
- throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex);
- } catch (Throwable ex) {
- throw new MetadataProcessException("failed to create DataMap '" + providerName + "'", ex);
- }
- } else {
- throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found");
- }
- return dataMapFactory;
- }
-
- @Override public DataMapCatalog createDataMapCatalog() {
- // TODO create abstract class and move the default implementation there.
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 9179bbc..476f9da 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -58,6 +58,10 @@ public class Segment implements Serializable {
*/
private LoadMetadataDetails loadMetadataDetails;
+ public Segment(String segmentNo) {
+ this.segmentNo = segmentNo;
+ }
+
/**
* ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
* a NullPointerException. In case getCommittedIndexFile is need to be accessed then
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 2dc6317..314b515 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -155,13 +155,13 @@ public final class TableDataMap extends OperationEventListener {
String writePath =
identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapSchema
.getDataMapName();
- if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
+ if (dataMapFactory.getDataMapLevel() == DataMapLevel.FG) {
FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
}
for (Blocklet blocklet : blocklets) {
ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher
.getExtendedBlocklet(blocklet, distributable.getSegment());
- if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
+ if (dataMapFactory.getDataMapLevel() == DataMapLevel.FG) {
String blockletwritePath =
writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
detailedBlocklet.setDataMapWriterPath(blockletwritePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 20bdfb7..ae34be7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -17,7 +17,9 @@
package org.apache.carbondata.core.datamap.dev;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.core.datamap.DataMapDistributable;
@@ -27,74 +29,114 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.events.Event;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.INDEX_COLUMNS;
/**
- * Interface for datamap factory, it is responsible for creating the datamap.
+ * Interface for datamap of index type, it is responsible for creating the datamap.
*/
-public interface DataMapFactory<T extends DataMap> {
+public abstract class DataMapFactory<T extends DataMap> {
+
+ private CarbonTable carbonTable;
+ private DataMapSchema dataMapSchema;
+
+ public DataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+ this.carbonTable = carbonTable;
+ this.dataMapSchema = dataMapSchema;
+ }
+
+ public CarbonTable getCarbonTable() {
+ return carbonTable;
+ }
+
+ public DataMapSchema getDataMapSchema() {
+ return dataMapSchema;
+ }
/**
- * Initialization of Datamap factory with the carbonTable and datamap name
+ * Create a new write for this datamap, to write new data into the specified segment and shard
*/
- void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
- throws IOException, MalformedDataMapCommandException;
+ public abstract DataMapWriter createWriter(Segment segment, String shardName)
+ throws IOException;
/**
- * Return a new write for this datamap
+ * Create a new Refresher for this datamap, to rebuild the specified
+ * segment and shard data in the main table.
*/
- DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
+ public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
+ throws IOException;
/**
* Get the datamap for segmentid
*/
- List<T> getDataMaps(Segment segment) throws IOException;
+ public abstract List<T> getDataMaps(Segment segment) throws IOException;
/**
* Get datamaps for distributable object.
*/
- List<T> getDataMaps(DataMapDistributable distributable)
+ public abstract List<T> getDataMaps(DataMapDistributable distributable)
throws IOException;
/**
* Get all distributable objects of a segmentid
* @return
*/
- List<DataMapDistributable> toDistributable(Segment segment);
+ public abstract List<DataMapDistributable> toDistributable(Segment segment);
/**
*
* @param event
*/
- void fireEvent(Event event);
+ public abstract void fireEvent(Event event);
/**
* Clears datamap of the segment
*/
- void clear(Segment segment);
+ public abstract void clear(Segment segment);
/**
* Clear all datamaps from memory
*/
- void clear();
+ public abstract void clear();
/**
* Return metadata of this datamap
*/
- DataMapMeta getMeta();
+ public abstract DataMapMeta getMeta();
/**
* Type of datamap whether it is FG or CG
*/
- DataMapLevel getDataMapType();
+ public abstract DataMapLevel getDataMapLevel();
/**
* delete datamap data if any
*/
- void deleteDatamapData();
+ public abstract void deleteDatamapData();
/**
* This function should return true is the input operation enum will make the datamap become stale
*/
- boolean willBecomeStale(TableOperation operation);
+ public abstract boolean willBecomeStale(TableOperation operation);
+
+ /**
+ * Validate INDEX_COLUMNS property and return a array containing index column name
+ * Following will be validated
+ * 1. require INDEX_COLUMNS property
+ * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
+ * 3. INDEX_COLUMNS can't contains duplicate same columns
+ * 4. INDEX_COLUMNS should be exists in table columns
+ */
+ public void validate() throws MalformedDataMapCommandException {
+ List<CarbonColumn> indexColumns = carbonTable.getIndexedColumns(dataMapSchema);
+ Set<String> unique = new HashSet<>();
+ for (CarbonColumn indexColumn : indexColumns) {
+ unique.add(indexColumn.getColName());
+ }
+ if (unique.size() != indexColumns.size()) {
+ throw new MalformedDataMapCommandException(INDEX_COLUMNS + " has duplicate column");
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
new file mode 100644
index 0000000..770ceca
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Interface to rebuild the datamap for main table with existing data
+ */
+@InterfaceAudience.Developer("DataMap")
+public interface DataMapRefresher {
+ void initialize() throws IOException;
+
+ void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException;
+
+ void finish() throws IOException;
+
+ void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 1933f70..03a369a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -16,14 +16,16 @@
*/
package org.apache.carbondata.core.datamap.dev;
+import java.io.File;
import java.io.IOException;
+import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -36,17 +38,25 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
@InterfaceStability.Evolving
public abstract class DataMapWriter {
- protected AbsoluteTableIdentifier identifier;
+ protected String tablePath;
protected String segmentId;
- protected String writeDirectoryPath;
+ protected String dataMapPath;
- public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
- String writeDirectoryPath) {
- this.identifier = identifier;
+ private List<CarbonColumn> indexColumns;
+
+ public DataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
+ Segment segment, String shardName) {
+ this.tablePath = tablePath;
this.segmentId = segment.getSegmentNo();
- this.writeDirectoryPath = writeDirectoryPath;
+ this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
+ tablePath, segmentId, dataMapName, shardName);
+ this.indexColumns = indexColumns;
+ }
+
+ protected final List<CarbonColumn> getIndexColumns() {
+ return indexColumns;
}
/**
@@ -54,7 +64,7 @@ public abstract class DataMapWriter {
*
* @param blockId file name of the carbondata file
*/
- public abstract void onBlockStart(String blockId, String indexShardName) throws IOException;
+ public abstract void onBlockStart(String blockId) throws IOException;
/**
* End of block notification
@@ -66,22 +76,22 @@ public abstract class DataMapWriter {
*
* @param blockletId sequence number of blocklet in the block
*/
- public abstract void onBlockletStart(int blockletId);
+ public abstract void onBlockletStart(int blockletId) throws IOException;
/**
* End of blocklet notification
*
* @param blockletId sequence number of blocklet in the block
*/
- public abstract void onBlockletEnd(int blockletId);
+ public abstract void onBlockletEnd(int blockletId) throws IOException;
/**
- * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * Add columnar page data to the datamap, order of field is same as `indexColumns` in
* DataMapMeta returned in DataMapFactory.
- * Implementation should copy the content of `pages` as needed, because `pages` memory
- * may be freed after this method returns, if using unsafe column page.
+ * Implementation should copy the content of it as needed, because its memory
+ * may be freed after this method returns, in case of unsafe memory
*/
- public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
+ public abstract void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages)
throws IOException;
/**
@@ -97,15 +107,15 @@ public abstract class DataMapWriter {
* @throws IOException if IO fails
*/
protected void commitFile(String dataMapFile) throws IOException {
- if (!dataMapFile.startsWith(writeDirectoryPath)) {
+ if (!dataMapFile.startsWith(dataMapPath)) {
throw new UnsupportedOperationException(
"Datamap file " + dataMapFile + " is not written in provided directory path "
- + writeDirectoryPath);
+ + dataMapPath);
}
String dataMapFileName =
- dataMapFile.substring(writeDirectoryPath.length(), dataMapFile.length());
+ dataMapFile.substring(dataMapPath.length(), dataMapFile.length());
String carbonFilePath = dataMapFileName.substring(0, dataMapFileName.lastIndexOf("/"));
- String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+ String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
if (carbonFilePath.length() > 0) {
carbonFilePath = segmentPath + carbonFilePath;
FileFactory.mkdirs(carbonFilePath, FileFactory.getFileType(carbonFilePath));
@@ -115,4 +125,12 @@ public abstract class DataMapWriter {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(dataMapFile, carbonFilePath, 0);
}
+
+ /**
+ * Return store path for datamap
+ */
+ public static String getDefaultDataMapPath(
+ String tablePath, String segmentId, String dataMapName) {
+ return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
index f9fdafb..3de923f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
@@ -21,6 +21,8 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
/**
* Factory for {@link CoarseGrainDataMap}
@@ -31,9 +33,14 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
*/
@InterfaceAudience.Developer("DataMap")
@InterfaceStability.Evolving
-public abstract class CoarseGrainDataMapFactory implements DataMapFactory<CoarseGrainDataMap> {
+public abstract class CoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDataMap> {
- @Override public DataMapLevel getDataMapType() {
+ public CoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+ super(carbonTable, dataMapSchema);
+ }
+
+ @Override
+ public DataMapLevel getDataMapLevel() {
return DataMapLevel.CG;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index eff6b4d..0a3896c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -85,7 +85,7 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
}
@Override public DataMapLevel getDataMapLevel() {
- return dataMap.getDataMapFactory().getDataMapType();
+ return dataMap.getDataMapFactory().getDataMapLevel();
}
@Override public DataMapSchema getDataMapSchema() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
index 9c78cc8..9d8f594 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java
@@ -41,8 +41,9 @@ public class FineGrainBlocklet extends Blocklet implements Serializable {
private List<Page> pages;
- public FineGrainBlocklet(String taskName, String blockletId, List<Page> pages) {
- super(taskName, blockletId);
+ /** For FG, pass the shardName instead of file name */
+ public FineGrainBlocklet(String shardName, String blockletId, List<Page> pages) {
+ super(shardName, blockletId);
this.pages = pages;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
index 0c9aaed..5e4ecb4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMapFactory.java
@@ -20,6 +20,8 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
/**
* Factory for {@link FineGrainDataMap}
@@ -35,10 +37,14 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
*/
@InterfaceAudience.Developer("DataMap")
@InterfaceStability.Evolving
-public abstract class FineGrainDataMapFactory
- implements DataMapFactory<FineGrainDataMap> {
+public abstract class FineGrainDataMapFactory extends DataMapFactory<FineGrainDataMap> {
- @Override public DataMapLevel getDataMapType() {
+ public FineGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+ super(carbonTable, dataMapSchema);
+ }
+
+ @Override
+ public DataMapLevel getDataMapLevel() {
return DataMapLevel.FG;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
index 31ab4e4..dcad80b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
@@ -51,7 +51,7 @@ public class DataMapStatusManager {
return storageProvider.getDataMapStatusDetails();
}
- public static void disableDataMap(String dataMapName) throws Exception {
+ public static void disableDataMap(String dataMapName) throws IOException, NoSuchDataMapException {
DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
if (dataMapSchema != null) {
List<DataMapSchema> list = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 9b40be4..777a980 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -29,13 +29,13 @@ import org.apache.carbondata.core.metadata.schema.table.Writable;
public class Blocklet implements Writable,Serializable {
/** file path of this blocklet */
- private String taskName;
+ private String filePath;
/** id to identify the blocklet inside the block (it is a sequential number) */
private String blockletId;
- public Blocklet(String taskName, String blockletId) {
- this.taskName = taskName;
+ public Blocklet(String filePath, String blockletId) {
+ this.filePath = filePath;
this.blockletId = blockletId;
}
@@ -47,17 +47,17 @@ public class Blocklet implements Writable,Serializable {
return blockletId;
}
- public String getTaskName() {
- return taskName;
+ public String getFilePath() {
+ return filePath;
}
@Override public void write(DataOutput out) throws IOException {
- out.writeUTF(taskName);
+ out.writeUTF(filePath);
out.writeUTF(blockletId);
}
@Override public void readFields(DataInput in) throws IOException {
- taskName = in.readUTF();
+ filePath = in.readUTF();
blockletId = in.readUTF();
}
@@ -67,7 +67,7 @@ public class Blocklet implements Writable,Serializable {
Blocklet blocklet = (Blocklet) o;
- if (taskName != null ? !taskName.equals(blocklet.taskName) : blocklet.taskName != null) {
+ if (filePath != null ? !filePath.equals(blocklet.filePath) : blocklet.filePath != null) {
return false;
}
return blockletId != null ?
@@ -76,7 +76,7 @@ public class Blocklet implements Writable,Serializable {
}
@Override public int hashCode() {
- int result = taskName != null ? taskName.hashCode() : 0;
+ int result = filePath != null ? filePath.hashCode() : 0;
result = 31 * result + (blockletId != null ? blockletId.hashCode() : 0);
return result;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index ea2752c..077b942 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -33,8 +33,8 @@ public class ExtendedBlocklet extends Blocklet {
private String dataMapUniqueId;
- public ExtendedBlocklet(String path, String blockletId) {
- super(path, blockletId);
+ public ExtendedBlocklet(String filePath, String blockletId) {
+ super(filePath, blockletId);
}
public BlockletDetailInfo getDetailInfo() {
@@ -66,7 +66,7 @@ public class ExtendedBlocklet extends Blocklet {
}
public String getPath() {
- return getTaskName();
+ return getFilePath();
}
public String getDataMapWriterPath() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 0d7539c..c0bc2a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
@@ -72,15 +73,20 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache;
- @Override
- public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+ public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+ super(carbonTable, dataMapSchema);
this.identifier = carbonTable.getAbsoluteTableIdentifier();
cache = CacheProvider.getInstance()
.createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
}
@Override
- public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+ public DataMapWriter createWriter(Segment segment, String shardName) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ @Override
+ public DataMapRefresher createRefresher(Segment segment, String shardName) {
throw new UnsupportedOperationException("not implemented");
}
@@ -147,7 +153,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
Blocklet blocklet) throws IOException {
for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
- if (identifier.getIndexFileName().startsWith(blocklet.getTaskName())) {
+ if (identifier.getIndexFileName().startsWith(blocklet.getFilePath())) {
DataMap dataMap = cache.get(identifier);
return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
index 4ab400d..d5a99e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
@@ -29,7 +29,8 @@ package org.apache.carbondata.core.metadata.schema.datamap;
public enum DataMapClassProvider {
PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"),
TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries"),
- LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene");
+ LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene"),
+ BLOOMFILTER("org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapFactory", "bloomfilter");
/**
* Fully qualified class name of datamap
@@ -60,18 +61,6 @@ public enum DataMapClassProvider {
dataMapClass.equalsIgnoreCase(shortName)));
}
- public static DataMapClassProvider getDataMapProvider(String dataMapClass) {
- if (TIMESERIES.isEqual(dataMapClass)) {
- return TIMESERIES;
- } else if (PREAGGREGATE.isEqual(dataMapClass)) {
- return PREAGGREGATE;
- } else if (LUCENE.isEqual(dataMapClass)) {
- return LUCENE;
- } else {
- throw new UnsupportedOperationException("Unknown datamap provider/class " + dataMapClass);
- }
- }
-
public static DataMapClassProvider getDataMapProviderOnName(String dataMapShortname) {
if (TIMESERIES.isEqual(dataMapShortname)) {
return TIMESERIES;
@@ -79,6 +68,8 @@ public enum DataMapClassProvider {
return PREAGGREGATE;
} else if (LUCENE.isEqual(dataMapShortname)) {
return LUCENE;
+ } else if (BLOOMFILTER.isEqual(dataMapShortname)) {
+ return BLOOMFILTER;
} else {
throw new UnsupportedOperationException("Unknown datamap provider" + dataMapShortname);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 1875237..4178d8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -980,8 +981,11 @@ public class CarbonTable implements Serializable {
if (!datamaps.isEmpty()) {
for (TableDataMap dataMap : datamaps) {
DataMapFactory factoryClass =
- DataMapStoreManager.getInstance().getDataMapFactoryClass(dataMap.getDataMapSchema());
- return !factoryClass.willBecomeStale(operation);
+ DataMapStoreManager.getInstance().getDataMapFactoryClass(
+ carbonTable, dataMap.getDataMapSchema());
+ if (factoryClass.willBecomeStale(operation)) {
+ return false;
+ }
}
}
} catch (Exception e) {
@@ -993,4 +997,26 @@ public class CarbonTable implements Serializable {
return true;
}
+ /**
+ * Get all index columns specified by dataMapSchema
+ */
+ public List<CarbonColumn> getIndexedColumns(DataMapSchema dataMapSchema)
+ throws MalformedDataMapCommandException {
+ String[] columns = dataMapSchema.getIndexColumns();
+ List<CarbonColumn> indexColumn = new ArrayList<>(columns.length);
+ for (String column : columns) {
+ CarbonColumn carbonColumn = getColumnByName(getTableName(), column.trim().toLowerCase());
+ if (carbonColumn == null) {
+ throw new MalformedDataMapCommandException(String.format(
+ "column '%s' does not exist in table. Please check create DataMap statement.",
+ column));
+ }
+ if (carbonColumn.getColName().isEmpty()) {
+ throw new MalformedDataMapCommandException(
+ CarbonCommonConstants.INDEX_COLUMNS + " contains invalid column name");
+ }
+ indexColumn.add(carbonColumn);
+ }
+ return indexColumn;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 235c312..b22a3d4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -26,9 +26,12 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.INDEX_COLUMNS;
import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
/**
* It is the new schama of datamap and it has less fields compare to {{@link DataMapSchema}}
@@ -201,6 +204,24 @@ public class DataMapSchema implements Serializable, Writable {
}
}
+ /**
+ * Return the list of column name
+ */
+ public String[] getIndexColumns()
+ throws MalformedDataMapCommandException {
+ String columns = getProperties().get(INDEX_COLUMNS);
+ if (columns == null) {
+ columns = getProperties().get(INDEX_COLUMNS.toLowerCase());
+ }
+ if (columns == null) {
+ throw new MalformedDataMapCommandException(INDEX_COLUMNS + " DMPROPERTY is required");
+ } else if (StringUtils.isBlank(columns)) {
+ throw new MalformedDataMapCommandException(INDEX_COLUMNS + " DMPROPERTY is blank");
+ } else {
+ return columns.split(",", -1);
+ }
+ }
+
@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 37df0e5..57d8177 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -164,7 +164,8 @@ public class BlockletFilterScanner extends BlockletFullScanner {
totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
totalBlockletStatistic.getCount() + 1);
// set the indexed data if it has any during fgdatamap pruning.
- rawBlockletColumnChunks.setBitSetGroup(rawBlockletColumnChunks.getDataBlock().getIndexedData());
+ BitSetGroup fgBitSetGroup = rawBlockletColumnChunks.getDataBlock().getIndexedData();
+ rawBlockletColumnChunks.setBitSetGroup(fgBitSetGroup);
// apply filter on actual data, for each page
BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks,
useBitSetPipeLine);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 0538e7f..b9f4838 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -330,6 +330,11 @@ public class CarbonTablePath {
+ bucketNumber + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
}
+ public static String getShardName(Long taskNo, int bucketNumber, int batchNo,
+ String factUpdateTimeStamp) {
+ return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdateTimeStamp;
+ }
+
/**
* Below method will be used to get the carbon index filename
*
@@ -394,6 +399,24 @@ public class CarbonTablePath {
}
/**
+ * Return store path for datamap based on the taskNo,if three tasks get launched during loading,
+ * then three folders will be created based on the shard name and lucene index file will be
+ * written into those folders
+ *
+ * @return store path based on index shard name
+ */
+ public static String getDataMapStorePathOnShardName(String tablePath, String segmentId,
+ String dataMapName, String shardName) {
+ return new StringBuilder()
+ .append(getSegmentPath(tablePath, segmentId))
+ .append(File.separator)
+ .append(dataMapName)
+ .append(File.separator)
+ .append(shardName)
+ .toString();
+ }
+
+ /**
* To manage data file name and composition
*/
public static class DataFileUtil {
@@ -487,6 +510,13 @@ public class CarbonTablePath {
}
/**
+ * Return task id in the carbon data file name
+ */
+ public static long getTaskId(String carbonDataFileName) {
+ return Long.parseLong(getTaskNo(carbonDataFileName).split(BATCH_PREFIX)[0]);
+ }
+
+ /**
* Return the taskId part from taskNo(include taskId + batchNo)
*/
public static long getTaskIdFromTaskNo(String taskNo) {
@@ -643,7 +673,7 @@ public class CarbonTablePath {
}
public static String getCarbonIndexFileName(String actualBlockName) {
- return getUniqueTaskName(actualBlockName) + INDEX_FILE_EXT;
+ return getShardName(actualBlockName) + INDEX_FILE_EXT;
}
/**
@@ -651,7 +681,7 @@ public class CarbonTablePath {
* @param actualBlockName
* @return
*/
- public static String getUniqueTaskName(String actualBlockName) {
+ public static String getShardName(String actualBlockName) {
return DataFileUtil.getTaskNo(actualBlockName) + "-" + DataFileUtil.getBucketNo(actualBlockName)
+ "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 81cccf2..725d5cd 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.fs.PathFilter;
public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
- private String[] indexFilePath;
private Set<String> indexedColumn;
private List<BloomDMModel> bloomIndexList;
private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
@@ -88,18 +87,17 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
return path.getName().endsWith(BLOOM_INDEX_SUFFIX);
}
});
- indexFilePath = new String[indexFileStatus.length];
indexedColumn = new HashSet<String>();
bloomIndexList = new ArrayList<BloomDMModel>();
indexCol2BloomDMList = ArrayListMultimap.create();
for (int i = 0; i < indexFileStatus.length; i++) {
- indexFilePath[i] = indexFileStatus[i].getPath().toString();
String indexfilename = indexFileStatus[i].getPath().getName();
String indexCol =
indexfilename.substring(0, indexfilename.length() - BLOOM_INDEX_SUFFIX.length());
indexedColumn.add(indexCol);
- bloomIndexList.addAll(readBloomIndex(indexFilePath[i]));
- indexCol2BloomDMList.put(indexCol, readBloomIndex(indexFilePath[i]));
+ List<BloomDMModel> models = readBloomIndex(indexFileStatus[i].getPath().toString());
+ bloomIndexList.addAll(models);
+ indexCol2BloomDMList.put(indexCol, models);
}
LOGGER.info("find bloom index datamap for column: "
+ StringUtils.join(indexedColumn, ", "));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index b76390f..2d43c40 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -19,10 +19,8 @@ package org.apache.carbondata.datamap.bloom;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -34,6 +32,7 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -50,15 +49,14 @@ import org.apache.carbondata.events.Event;
import org.apache.commons.lang3.StringUtils;
+/**
+ * This class is for Bloom Filter for blocklet level
+ */
@InterfaceAudience.Internal
-public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrainDataMap> {
+public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDataMap> {
private static final LogService LOGGER = LogServiceFactory.getLogService(
BloomCoarseGrainDataMapFactory.class.getName());
/**
- * property for indexed column
- */
- private static final String BLOOM_COLUMNS = "bloom_columns";
- /**
* property for size of bloom filter
*/
private static final String BLOOM_SIZE = "bloom_size";
@@ -67,21 +65,19 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
* and all the indexed value is distinct.
*/
private static final int DEFAULT_BLOOM_FILTER_SIZE = 32000 * 20;
- private CarbonTable carbonTable;
private DataMapMeta dataMapMeta;
private String dataMapName;
private int bloomFilterSize;
- @Override
- public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
- throws IOException, MalformedDataMapCommandException {
+ public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
+ throws MalformedDataMapCommandException {
+ super(carbonTable, dataMapSchema);
Objects.requireNonNull(carbonTable);
Objects.requireNonNull(dataMapSchema);
- this.carbonTable = carbonTable;
this.dataMapName = dataMapSchema.getDataMapName();
- List<String> indexedColumns = validateAndGetIndexedColumns(dataMapSchema, carbonTable);
+ List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema);
List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
// todo: support more optimize operations
@@ -92,40 +88,6 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
}
/**
- * validate Lucene DataMap BLOOM_COLUMNS
- * 1. require BLOOM_COLUMNS property
- * 2. BLOOM_COLUMNS can't contains illegal argument(empty, blank)
- * 3. BLOOM_COLUMNS can't contains duplicate same columns
- * 4. BLOOM_COLUMNS should be exists in table columns
- */
- private List<String> validateAndGetIndexedColumns(DataMapSchema dmSchema,
- CarbonTable carbonTable) throws MalformedDataMapCommandException {
- String bloomColumnsStr = dmSchema.getProperties().get(BLOOM_COLUMNS);
- if (StringUtils.isBlank(bloomColumnsStr)) {
- throw new MalformedDataMapCommandException(
- String.format("Bloom coarse datamap require proper %s property", BLOOM_COLUMNS));
- }
- String[] bloomColumns = StringUtils.split(bloomColumnsStr, ",", -1);
- List<String> bloomColumnList = new ArrayList<String>(bloomColumns.length);
- Set<String> bloomColumnSet = new HashSet<String>(bloomColumns.length);
- for (String bloomCol : bloomColumns) {
- CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(),
- bloomCol.trim().toLowerCase());
- if (null == column) {
- throw new MalformedDataMapCommandException(
- String.format("%s: %s does not exist in table. Please check create datamap statement",
- BLOOM_COLUMNS, bloomCol));
- }
- if (!bloomColumnSet.add(column.getColName())) {
- throw new MalformedDataMapCommandException(String.format("%s has duplicate column: %s",
- BLOOM_COLUMNS, bloomCol));
- }
- bloomColumnList.add(column.getColName());
- }
- return bloomColumnList;
- }
-
- /**
* validate Lucene DataMap BLOOM_SIZE
* 1. BLOOM_SIZE property is optional, 32000 * 20 will be the default size.
* 2. BLOOM_SIZE should be an integer that greater than 0
@@ -157,21 +119,26 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
}
@Override
- public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+ public DataMapWriter createWriter(Segment segment, String shardName) throws IOException {
LOGGER.info(
String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s",
- this.dataMapName, this.carbonTable.getTableName() , writeDirectoryPath));
- return new BloomDataMapWriter(this.carbonTable.getAbsoluteTableIdentifier(),
- this.dataMapMeta, this.bloomFilterSize, segment, writeDirectoryPath);
+ this.dataMapName, getCarbonTable().getTableName() , shardName));
+ return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName,
+ this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize);
+ }
+
+ @Override
+ public DataMapRefresher createRefresher(Segment segment, String shardName) throws IOException {
+ return new BloomDataMapRefresher(getCarbonTable().getTablePath(), this.dataMapName,
+ this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize);
}
@Override
public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
try {
- String dataMapStorePath = BloomDataMapWriter.genDataMapStorePath(
- CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segment.getSegmentNo()),
- dataMapName);
+ String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
+ getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
for (CarbonFile carbonFile : carbonFiles) {
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
@@ -212,13 +179,14 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
@Override
public void deleteDatamapData() {
- SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+ SegmentStatusManager ssm =
+ new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
try {
List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
for (Segment segment : validSegments) {
String segmentId = segment.getSegmentNo();
String datamapPath = CarbonTablePath.getSegmentPath(
- carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId)
+ getCarbonTable().getAbsoluteTableIdentifier().getTablePath(), segmentId)
+ File.separator + dataMapName;
if (FileFactory.isFileExist(datamapPath)) {
CarbonFile file = FileFactory.getCarbonFile(datamapPath,
@@ -241,7 +209,7 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai
}
@Override
- public DataMapLevel getDataMapType() {
+ public DataMapLevel getDataMapLevel() {
return DataMapLevel.CG;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
index 6351199..3cf2f3b 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
@@ -22,6 +22,9 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
import com.google.common.hash.BloomFilter;
+/**
+ * This class holds a bloom filter for one blocklet
+ */
@InterfaceAudience.Internal
public class BloomDMModel implements Serializable {
private static final long serialVersionUID = 7281578747306832771L;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
new file mode 100644
index 0000000..cb86c39
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
+ */
+@InterfaceAudience.Internal
+public class BloomDataMapRefresher extends BloomDataMapWriter implements DataMapRefresher {
+
+ BloomDataMapRefresher(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
+ Segment segment, String shardName, int bloomFilterSize) throws IOException {
+ super(tablePath, dataMapName, indexColumns, segment, shardName, bloomFilterSize);
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ super.resetBloomFilters();
+ }
+
+ @Override
+ public void addRow(int blockletId, int pageId, int rowId, Object[] values) {
+ if (currentBlockletId != blockletId) {
+ // new blocklet started, flush bloom filter to datamap fileh
+ super.writeBloomDataMapFile();
+ currentBlockletId = blockletId;
+ }
+ // for each indexed column, add the data to bloom filter
+ List<CarbonColumn> indexColumns = getIndexColumns();
+ for (int i = 0; i < indexColumns.size(); i++) {
+ Object data = values[i];
+ DataType dataType = indexColumns.get(i).getDataType();
+ byte[] indexValue;
+ if (DataTypes.STRING == dataType) {
+ indexValue = getStringData(data);
+ } else if (DataTypes.BYTE_ARRAY == dataType) {
+ byte[] originValue = (byte[]) data;
+ // String and byte array is LV encoded, L is short type
+ indexValue = new byte[originValue.length - 2];
+ System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
+ } else {
+ indexValue = CarbonUtil.getValueAsBytes(dataType, data);
+ }
+ indexBloomFilters.get(i).put(indexValue);
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ super.finish();
+ }
+
+ @Override
+ public void close() throws IOException {
+ releaseResouce();
+ }
+
+ @Override
+ protected byte[] getStringData(Object data) {
+ return ((String) data).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ }
+}