You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 15:48:48 UTC
[2/2] carbondata git commit: [CARBONDATA-2206] support lucene index
datamap
[CARBONDATA-2206] support lucene index datamap
This PR is an initial effort to integrate lucene as an index datamap into carbondata.
A new module called carbondata-lucene is added to support lucene datamap:
1.Add LuceneFineGrainDataMap, implement FineGrainDataMap interface.
2.Add LuceneCoarseGrainDataMap, implement CoarseGrainDataMap interface.
3.Support writing lucene index via LuceneDataMapWriter.
4.Implement LuceneDataMapFactory
5.A UDF called TEXT_MATCH is added, use it to do filtering on string column by lucene
This closes #2003
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/24ebfbbd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/24ebfbbd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/24ebfbbd
Branch: refs/heads/datamap
Commit: 24ebfbbdadbb056da7b7f4ef83d4c87777007220
Parents: 06c572d
Author: Jacky Li <ja...@qq.com>
Authored: Mon Feb 26 16:30:38 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Feb 27 23:48:27 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/datamap/DataMapChooser.java | 4 +
.../core/datamap/DataMapStoreManager.java | 5 +-
.../carbondata/core/datamap/dev/DataMap.java | 2 +-
.../core/datamap/dev/DataMapFactory.java | 2 +-
.../core/datamap/dev/DataMapWriter.java | 7 +-
.../cgdatamap/CoarseGrainDataMapFactory.java | 1 +
.../core/scan/filter/intf/ExpressionType.java | 3 +-
.../carbondata/core/util/DataTypeUtil.java | 2 +
datamap/lucene/pom.xml | 149 +++++++++
.../lucene/LuceneCoarseGrainDataMap.java | 232 +++++++++++++
.../lucene/LuceneCoarseGrainDataMapFactory.java | 72 ++++
.../lucene/LuceneDataMapDistributable.java | 36 ++
.../lucene/LuceneDataMapFactoryBase.java | 180 ++++++++++
.../datamap/lucene/LuceneDataMapWriter.java | 328 +++++++++++++++++++
.../datamap/lucene/LuceneFineGrainDataMap.java | 280 ++++++++++++++++
.../lucene/LuceneFineGrainDataMapFactory.java | 68 ++++
.../lucene/LuceneCoarseGrainDataMapSuite.scala | 73 +++++
.../lucene/LuceneFineGrainDataMapSuite.scala | 98 ++++++
integration/spark-common-test/pom.xml | 6 +
.../testsuite/datamap/FGDataMapTestCase.scala | 2 +-
.../carbondata/datamap/DataMapProvider.java | 4 +-
.../datamap/IndexDataMapProvider.java | 4 +-
.../datamap/expression/MatchExpression.java | 56 ++++
.../carbondata/datamap/TextMatchUDF.scala | 34 ++
.../scala/org/apache/spark/sql/CarbonEnv.scala | 5 +
.../strategy/CarbonLateDecodeStrategy.scala | 9 +
.../spark/sql/optimizer/CarbonFilters.scala | 4 +
pom.xml | 3 +
.../datamap/DataMapWriterListener.java | 6 +-
.../store/writer/AbstractFactDataWriter.java | 12 +-
.../writer/v3/CarbonFactDataWriterImplV3.java | 4 +-
31 files changed, 1673 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 94b48c6..c8c971d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -228,6 +228,10 @@ public class DataMapChooser {
private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpressions,
Set<ExpressionType> expressionTypes) {
+ if (mapMeta.getOptimizedOperation().contains(ExpressionType.TEXT_MATCH)) {
+ // TODO: fix it with right logic
+ return true;
+ }
if (mapMeta.getIndexedColumns().size() == 0 || columnExpressions.size() == 0) {
return false;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index e38f4f9..0223ae2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.datamap;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -144,7 +145,7 @@ public final class DataMapStoreManager {
* The datamap is created using datamap name, datamap factory class and table identifier.
*/
private TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
- DataMapSchema dataMapSchema) throws MalformedDataMapCommandException {
+ DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
DataMapFactory dataMapFactory;
try {
// try to create datamap by reflection to test whether it is a valid DataMapFactory class
@@ -162,7 +163,7 @@ public final class DataMapStoreManager {
}
public TableDataMap registerDataMap(AbsoluteTableIdentifier identifier,
- DataMapSchema dataMapSchema, DataMapFactory dataMapFactory) {
+ DataMapSchema dataMapSchema, DataMapFactory dataMapFactory) throws IOException {
String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
// Just update the segmentRefreshMap with the table if not added.
getTableSegmentRefresher(identifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index cdd7387..82e20fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -44,7 +44,7 @@ public interface DataMap<T extends Blocklet> {
* @return
*/
List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
- List<String> partitions);
+ List<String> partitions) throws IOException;
// TODO Move this method to Abstract class
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 77f2249..7bf04c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -34,7 +34,7 @@ public interface DataMapFactory<T extends DataMap> {
/**
* Initialization of Datamap factory with the identifier and datamap name
*/
- void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema);
+ void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) throws IOException;
/**
* Return a new write for this datamap
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 18252b6..8092efa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -53,12 +53,12 @@ public abstract class DataMapWriter {
*
* @param blockId file name of the carbondata file
*/
- public abstract void onBlockStart(String blockId);
+ public abstract void onBlockStart(String blockId) throws IOException;
/**
* End of block notification
*/
- public abstract void onBlockEnd(String blockId);
+ public abstract void onBlockEnd(String blockId) throws IOException;
/**
* Start of new blocklet notification.
@@ -80,7 +80,8 @@ public abstract class DataMapWriter {
* Implementation should copy the content of `pages` as needed, because `pages` memory
* may be freed after this method returns, if using unsafe column page.
*/
- public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+ public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
+ throws IOException;
/**
* This is called during closing of writer.So after this call no more data will be sent to this
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
index 4d20cdb..f9fdafb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.carbondata.core.datamap.dev.cgdatamap;
import org.apache.carbondata.common.annotations.InterfaceAudience;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
index 831acc8..d66c5b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
@@ -42,5 +42,6 @@ public enum ExpressionType {
TRUE,
STARTSWITH,
ENDSWITH,
- CONTAINSWITH
+ CONTAINSWITH,
+ TEXT_MATCH
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 6d224cf..f4b06e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -726,6 +726,8 @@ public final class DataTypeUtil {
*/
public static void setDataTypeConverter(DataTypeConverter converterLocal) {
converter = converterLocal;
+ timeStampformatter.remove();
+ dateformatter.remove();
}
public static DataTypeConverter getDataTypeConverter() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/lucene/pom.xml b/datamap/lucene/pom.xml
new file mode 100644
index 0000000..ee504c6
--- /dev/null
+++ b/datamap/lucene/pom.xml
@@ -0,0 +1,149 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-lucene</artifactId>
+ <name>Apache CarbonData :: Lucene Index DataMap</name>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ <lucene.version>6.3.0</lucene.version>
+ <solr.version>6.3.0</solr.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ <version>${lucene.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-analyzers-common</artifactId>
+ <version>${lucene.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-queryparser</artifactId>
+ <version>${lucene.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-sandbox</artifactId>
+ <version>${lucene.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-core</artifactId>
+ <version>${solr.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${solr.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/resources</directory>
+ </resource>
+ <resource>
+ <directory>.</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
new file mode 100644
index 0000000..0b7df86
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+@InterfaceAudience.Internal
+public class LuceneCoarseGrainDataMap extends CoarseGrainDataMap {
+
+ /**
+ * log information
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LuceneCoarseGrainDataMap.class.getName());
+
+ public static final int BLOCKID_ID = 0;
+
+ public static final int BLOCKLETID_ID = 1;
+
+ public static final int PAGEID_ID = 2;
+
+ public static final int ROWID_ID = 3;
+ /**
+ * searcher object for this datamap
+ */
+ private IndexSearcher indexSearcher = null;
+
+ /**
+ * default max values to return
+ */
+ private static int MAX_RESULT_NUMBER = 100;
+
+ /**
+ * analyzer for lucene index
+ */
+ private Analyzer analyzer;
+
+ LuceneCoarseGrainDataMap(Analyzer analyzer) {
+ this.analyzer = analyzer;
+ }
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ @Override
+ public void init(DataMapModel dataMapModel) throws MemoryException, IOException {
+ // get this path from file path
+ Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+
+ LOGGER.info("Lucene index read path " + indexPath.toString());
+
+ // get file system , use hdfs file system , realized in solr project
+ FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+ // check this path valid
+ if (!fs.exists(indexPath)) {
+ String errorMessage = String.format("index directory %s not exists.", indexPath);
+ LOGGER.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ if (!fs.isDirectory(indexPath)) {
+ String errorMessage = String.format("error index path %s, must be directory", indexPath);
+ LOGGER.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ // open this index path , use HDFS default configuration
+ Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+
+ IndexReader indexReader = DirectoryReader.open(indexDir);
+ if (indexReader == null) {
+ throw new RuntimeException("failed to create index reader object");
+ }
+
+ // create a index searcher object
+ indexSearcher = new IndexSearcher(indexReader);
+ }
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ */
+ @Override
+ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+ List<String> partitions) throws IOException {
+
+ // convert filter expr into lucene list query
+ List<String> fields = new ArrayList<String>();
+
+ // only for test , query all data
+ String strQuery = "*:*";
+
+ String[] sFields = new String[fields.size()];
+ fields.toArray(sFields);
+
+ // get analyzer
+ if (analyzer == null) {
+ analyzer = new StandardAnalyzer();
+ }
+
+ // use MultiFieldQueryParser to parser query
+ QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer);
+ Query query;
+ try {
+ query = queryParser.parse(strQuery);
+ } catch (ParseException e) {
+ String errorMessage = String
+ .format("failed to filter block with query %s, detail is %s", strQuery, e.getMessage());
+ LOGGER.error(errorMessage);
+ return null;
+ }
+
+ // execute index search
+ TopDocs result;
+ try {
+ result = indexSearcher.search(query, MAX_RESULT_NUMBER);
+ } catch (IOException e) {
+ String errorMessage =
+ String.format("failed to search lucene data, detail is %s", e.getMessage());
+ LOGGER.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ // temporary data, delete duplicated data
+ // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+ Map<String, Set<Number>> mapBlocks = new HashMap<String, Set<Number>>();
+
+ for (ScoreDoc scoreDoc : result.scoreDocs) {
+ // get a document
+ Document doc = indexSearcher.doc(scoreDoc.doc);
+
+ // get all fields
+ List<IndexableField> fieldsInDoc = doc.getFields();
+
+ // get this block id Map<BlockId, Set<BlockletId>>>>
+ String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue();
+ Set<Number> setBlocklets = mapBlocks.get(blockId);
+ if (setBlocklets == null) {
+ setBlocklets = new HashSet<Number>();
+ mapBlocks.put(blockId, setBlocklets);
+ }
+
+ // get the blocklet id Set<BlockletId>
+ Number blockletId = fieldsInDoc.get(BLOCKLETID_ID).numericValue();
+ if (!setBlocklets.contains(blockletId.intValue())) {
+ setBlocklets.add(blockletId.intValue());
+ }
+ }
+
+ // result blocklets
+ List<Blocklet> blocklets = new ArrayList<Blocklet>();
+
+ // transform all blocks into result type blocklets Map<BlockId, Set<BlockletId>>
+ for (Map.Entry<String, Set<Number>> mapBlock : mapBlocks.entrySet()) {
+ String blockId = mapBlock.getKey();
+ Set<Number> setBlocklets = mapBlock.getValue();
+
+ // for blocklets in this block Set<BlockletId>
+ for (Number blockletId : setBlocklets) {
+
+ // add a CoarseGrainBlocklet
+ blocklets.add(new Blocklet(blockId, blockletId.toString()));
+ }
+ }
+
+ return blocklets;
+ }
+
+ @Override
+ public boolean isScanRequired(FilterResolverIntf filterExp) {
+ return true;
+ }
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ @Override
+ public void clear() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..7f9cc1c
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * FG level of lucene DataMap
+ */
+@InterfaceAudience.Internal
+public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<CoarseGrainDataMap> {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LuceneCoarseGrainDataMapFactory.class.getName());
+
+ /**
+ * Get the datamap for segmentid
+ */
+ public List<CoarseGrainDataMap> getDataMaps(String segmentId) throws IOException {
+ List<CoarseGrainDataMap> lstDataMap = new ArrayList<>();
+ CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer);
+ try {
+ dataMap.init(new DataMapModel(
+ LuceneDataMapWriter.genDataMapStorePath(
+ tableIdentifier.getTablePath(), segmentId, dataMapName)));
+ } catch (MemoryException e) {
+ LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
+ return lstDataMap;
+ }
+ lstDataMap.add(dataMap);
+ return lstDataMap;
+ }
+
+ /**
+ * Get datamaps for distributable object.
+ */
+ public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+ throws IOException {
+ return getDataMaps(distributable.getSegmentId());
+ }
+
+ @Override
+ public DataMapLevel getDataMapType() {
+ return DataMapLevel.CG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
new file mode 100644
index 0000000..19e4035
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+
+@InterfaceAudience.Internal
+class LuceneDataMapDistributable extends DataMapDistributable {
+
+ // TODO: seems no one use this?
+ private String dataPath;
+
+ LuceneDataMapDistributable(String dataPath) {
+ this.dataPath = dataPath;
+ }
+
+ public String getDataPath() {
+ return dataPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
new file mode 100644
index 0000000..5eb7054
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * Base implementation for CG and FG lucene DataMapFactory.
+ */
+@InterfaceAudience.Internal
+abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFactory<T> {
+
+ /**
+ * Logger
+ */
+ final LogService LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
+
+ /**
+ * table's index columns
+ */
+ DataMapMeta dataMapMeta = null;
+
+ /**
+ * analyzer for lucene
+ */
+ Analyzer analyzer = null;
+
+ /**
+ * index name
+ */
+ String dataMapName = null;
+
+ /**
+ * table identifier
+ */
+ AbsoluteTableIdentifier tableIdentifier = null;
+
+ @Override
+ public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+ throws IOException {
+ Objects.requireNonNull(identifier);
+ Objects.requireNonNull(dataMapSchema);
+
+ this.tableIdentifier = identifier;
+ this.dataMapName = dataMapSchema.getDataMapName();
+
+ // get carbonmetadata from carbonmetadata instance
+ CarbonMetadata carbonMetadata = CarbonMetadata.getInstance();
+
+ String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
+
+ // get carbon table
+ CarbonTable carbonTable = carbonMetadata.getCarbonTable(tableUniqueName);
+ if (carbonTable == null) {
+ String errorMessage =
+ String.format("failed to get carbon table with name %s", tableUniqueName);
+ LOGGER.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ TableInfo tableInfo = carbonTable.getTableInfo();
+ List<ColumnSchema> lstCoumnSchemas = tableInfo.getFactTable().getListOfColumns();
+
+ // currently add all columns into lucene indexer
+ // TODO:only add index columns
+ List<String> indexedColumns = new ArrayList<String>();
+ for (ColumnSchema columnSchema : lstCoumnSchemas) {
+ if (!columnSchema.isInvisible()) {
+ indexedColumns.add(columnSchema.getColumnName());
+ }
+ }
+
+ // get indexed columns
+ // Map<String, String> properties = dataMapSchema.getProperties();
+ // String columns = properties.get("text_column");
+ // if (columns != null) {
+ // String[] columnArray = columns.split(CarbonCommonConstants.COMMA, -1);
+ // Collections.addAll(indexedColumns, columnArray);
+ // }
+
+ // add optimizedOperations
+ List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
+ // optimizedOperations.add(ExpressionType.EQUALS);
+ // optimizedOperations.add(ExpressionType.GREATERTHAN);
+ // optimizedOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
+ // optimizedOperations.add(ExpressionType.LESSTHAN);
+ // optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO);
+ // optimizedOperations.add(ExpressionType.NOT);
+ optimizedOperations.add(ExpressionType.TEXT_MATCH);
+ this.dataMapMeta = new DataMapMeta(indexedColumns, optimizedOperations);
+
+ // get analyzer
+ // TODO: how to get analyzer ?
+ analyzer = new StandardAnalyzer();
+ }
+
+ /**
+ * Return a new write for this datamap
+ */
+ public DataMapWriter createWriter(String segmentId, String writeDirectoryPath) {
+ LOGGER.info("lucene data write to " + writeDirectoryPath);
+ return new LuceneDataMapWriter(
+ tableIdentifier, dataMapName, segmentId, writeDirectoryPath, true);
+ }
+
+ /**
+ * Get all distributable objects of a segmentid
+ */
+ public List<DataMapDistributable> toDistributable(String segmentId) {
+ List<DataMapDistributable> lstDataMapDistribute = new ArrayList<DataMapDistributable>();
+ DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
+ CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segmentId));
+ lstDataMapDistribute.add(luceneDataMapDistributable);
+ return lstDataMapDistribute;
+ }
+
+ public void fireEvent(Event event) {
+
+ }
+
+ /**
+ * Clears datamap of the segment
+ */
+ public void clear(String segmentId) {
+
+ }
+
+ /**
+ * Clear all datamaps from memory
+ */
+ public void clear() {
+
+ }
+
+ /**
+ * Return metadata of this datamap
+ */
+ public DataMapMeta getMeta() {
+ return dataMapMeta;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
new file mode 100644
index 0000000..849fc2e
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoublePoint;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.IntRangeField;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+/**
+ * Implementation to write lucene index while loading
+ */
+@InterfaceAudience.Internal
+public class LuceneDataMapWriter extends DataMapWriter {
+ /**
+ * logger
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
+
+ /**
+ * index writer
+ */
+ private IndexWriter indexWriter = null;
+
+ private Analyzer analyzer = null;
+
+ private String blockId = null;
+
+ private String dataMapName = null;
+
+ private boolean isFineGrain = true;
+
+ private static final String BLOCKID_NAME = "blockId";
+
+ private static final String BLOCKLETID_NAME = "blockletId";
+
+ private static final String PAGEID_NAME = "pageId";
+
+ private static final String ROWID_NAME = "rowId";
+
+ LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, String segmentId,
+ String writeDirectoryPath, boolean isFineGrain) {
+ super(identifier, segmentId, writeDirectoryPath);
+ this.dataMapName = dataMapName;
+ this.isFineGrain = isFineGrain;
+ }
+
+ private String getIndexPath() {
+ if (isFineGrain) {
+ return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName);
+ } else {
+ // TODO: where write data in coarse grain data map
+ return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName);
+ }
+ }
+
+ /**
+ * Start of new block notification.
+ */
+ public void onBlockStart(String blockId) throws IOException {
+ // save this block id for lucene index , used in onPageAdd function
+ this.blockId = blockId;
+
+ // get index path, put index data into segment's path
+ String strIndexPath = getIndexPath();
+ Path indexPath = FileFactory.getPath(strIndexPath);
+ FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+ // if index path not exists, create it
+ if (fs.exists(indexPath)) {
+ fs.mkdirs(indexPath);
+ }
+
+ if (null == analyzer) {
+ analyzer = new StandardAnalyzer();
+ }
+
+ // create a index writer
+ Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+ indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
+
+ }
+
+ /**
+ * End of block notification
+ */
+ public void onBlockEnd(String blockId) throws IOException {
+ // clean this block id
+ this.blockId = null;
+
+ // finished a file , close this index writer
+ if (indexWriter != null) {
+ indexWriter.close();
+ }
+
+ }
+
+ /**
+ * Start of new blocklet notification.
+ */
+ public void onBlockletStart(int blockletId) {
+
+ }
+
+ /**
+ * End of blocklet notification
+ */
+ public void onBlockletEnd(int blockletId) {
+
+ }
+
+ /**
+ * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * DataMapMeta returned in DataMapFactory.
+ * Implementation should copy the content of `pages` as needed, because `pages` memory
+ * may be freed after this method returns, if using unsafe column page.
+ */
+ public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) throws IOException {
+ // save index data into ram, write into disk after one page finished
+ RAMDirectory ramDir = new RAMDirectory();
+ IndexWriter ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
+
+ int columnsCount = pages.length;
+ if (columnsCount <= 0) {
+ LOGGER.warn("empty data");
+ ramIndexWriter.close();
+ ramDir.close();
+ return;
+ }
+ int pageSize = pages[0].getPageSize();
+ for (int rowId = 0; rowId < pageSize; rowId++) {
+ // create a new document
+ Document doc = new Document();
+
+ // add block id, save this id
+ doc.add(new StringField(BLOCKID_NAME, blockId, Field.Store.YES));
+
+ // add blocklet Id
+ doc.add(new IntPoint(BLOCKLETID_NAME, new int[] { blockletId }));
+ doc.add(new StoredField(BLOCKLETID_NAME, blockletId));
+ //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId));
+
+ // add page id and row id in Fine Grain data map
+ if (isFineGrain) {
+ // add page Id
+ doc.add(new IntPoint(PAGEID_NAME, new int[] { pageId }));
+ doc.add(new StoredField(PAGEID_NAME, pageId));
+ //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId));
+
+ // add row id
+ doc.add(new IntPoint(ROWID_NAME, new int[] { rowId }));
+ doc.add(new StoredField(ROWID_NAME, rowId));
+ //doc.add(new NumericDocValuesField(ROWID_NAME,rowId));
+ }
+
+ // add other fields
+ for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
+ if (!pages[colIdx].getNullBits().get(rowId)) {
+ addField(doc, pages[colIdx], rowId, Field.Store.NO);
+ }
+ }
+
+ // add this document
+ ramIndexWriter.addDocument(doc);
+
+ }
+ // close ram writer
+ ramIndexWriter.close();
+
+ // add ram index data into disk
+ indexWriter.addIndexes(new Directory[] { ramDir });
+
+ // delete this ram data
+ ramDir.close();
+ }
+
+ private boolean addField(Document doc, ColumnPage page, int rowId, Field.Store store) {
+ //get field name
+ String fieldName = page.getColumnSpec().getFieldName();
+
+ //get field type
+ DataType type = page.getDataType();
+
+ if (type == DataTypes.BYTE) {
+ // byte type , use int range to deal with byte, lucene has no byte type
+ byte value = page.getByte(rowId);
+ IntRangeField field =
+ new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
+ field.setIntValue(value);
+ doc.add(field);
+
+ // if need store it , add StoredField
+ if (store == Field.Store.YES) {
+ doc.add(new StoredField(fieldName, (int) value));
+ }
+ } else if (type == DataTypes.SHORT) {
+ // short type , use int range to deal with short type, lucene has no short type
+ short value = page.getShort(rowId);
+ IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
+ new int[] { Short.MAX_VALUE });
+ field.setShortValue(value);
+ doc.add(field);
+
+ // if need store it , add StoredField
+ if (store == Field.Store.YES) {
+ doc.add(new StoredField(fieldName, (int) value));
+ }
+ } else if (type == DataTypes.INT) {
+ // int type , use int point to deal with int type
+ int value = page.getInt(rowId);
+ doc.add(new IntPoint(fieldName, new int[] { value }));
+
+ // if need store it , add StoredField
+ if (store == Field.Store.YES) {
+ doc.add(new StoredField(fieldName, value));
+ }
+ } else if (type == DataTypes.LONG) {
+ // long type , use long point to deal with long type
+ long value = page.getLong(rowId);
+ doc.add(new LongPoint(fieldName, new long[] { value }));
+
+ // if need store it , add StoredField
+ if (store == Field.Store.YES) {
+ doc.add(new StoredField(fieldName, value));
+ }
+ } else if (type == DataTypes.FLOAT) {
+ float value = page.getFloat(rowId);
+ doc.add(new FloatPoint(fieldName, new float[] { value }));
+ if (store == Field.Store.YES) {
+ doc.add(new FloatPoint(fieldName, value));
+ }
+ } else if (type == DataTypes.DOUBLE) {
+ double value = page.getDouble(rowId);
+ doc.add(new DoublePoint(fieldName, new double[] { value }));
+ if (store == Field.Store.YES) {
+ doc.add(new DoublePoint(fieldName, value));
+ }
+ } else if (type == DataTypes.STRING) {
+ byte[] value = page.getBytes(rowId);
+ // TODO: how to get string value
+ String strValue = null;
+ try {
+ strValue = new String(value, 2, value.length - 2, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ doc.add(new TextField(fieldName, strValue, store));
+ } else if (type == DataTypes.DATE) {
+ // TODO: how to get data value
+ } else if (type == DataTypes.TIMESTAMP) {
+ // TODO: how to get
+ } else if (type == DataTypes.BOOLEAN) {
+ boolean value = page.getBoolean(rowId);
+ IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
+ field.setIntValue(value ? 1 : 0);
+ doc.add(field);
+ if (store == Field.Store.YES) {
+ doc.add(new StoredField(fieldName, value ? 1 : 0));
+ }
+ } else {
+ LOGGER.error("unsupport data type " + type);
+ throw new RuntimeException("unsupported data type " + type);
+ }
+ return true;
+ }
+
+ /**
+ * This is called during closing of writer.So after this call no more data will be sent to this
+ * class.
+ */
+ public void finish() throws IOException {
+
+ }
+
+ /**
+ * Return store path for datamap
+ */
+ static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) {
+ return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
new file mode 100644
index 0000000..c649545
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet;
+import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.*;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+@InterfaceAudience.Internal
+public class LuceneFineGrainDataMap extends FineGrainDataMap {
+
+ private static final int BLOCKID_ID = 0;
+
+ private static final int BLOCKLETID_ID = 1;
+
+ private static final int PAGEID_ID = 2;
+
+ private static final int ROWID_ID = 3;
+
+ /**
+ * log information
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName());
+
+ /**
+ * searcher object for this datamap
+ */
+ private IndexSearcher indexSearcher = null;
+
+ /**
+ * default max values to return
+ */
+ private static int MAX_RESULT_NUMBER = 100;
+
+ /**
+ * analyzer for lucene index
+ */
+ private Analyzer analyzer;
+
+ LuceneFineGrainDataMap(Analyzer analyzer) {
+ this.analyzer = analyzer;
+ }
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ public void init(DataMapModel dataMapModel) throws MemoryException, IOException {
+ // get this path from file path
+ Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+
+ LOGGER.info("Lucene index read path " + indexPath.toString());
+
+ // get file system , use hdfs file system , realized in solr project
+ FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+ // check this path valid
+ if (!fs.exists(indexPath)) {
+ String errorMessage = String.format("index directory %s not exists.", indexPath);
+ LOGGER.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ if (!fs.isDirectory(indexPath)) {
+ String errorMessage = String.format("error index path %s, must be directory", indexPath);
+ LOGGER.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ // open this index path , use HDFS default configuration
+ Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+
+ IndexReader indexReader = DirectoryReader.open(indexDir);
+ if (indexReader == null) {
+ throw new RuntimeException("failed to create index reader object");
+ }
+
+ // create a index searcher object
+ indexSearcher = new IndexSearcher(indexReader);
+ }
+
+ /**
+ * Return the query string in the first TEXT_MATCH expression in the expression tree
+ */
+ private String getQueryString(Expression expression) {
+ if (expression.getFilterExpressionType() == ExpressionType.TEXT_MATCH) {
+ return expression.getString();
+ }
+
+ for (Expression child : expression.getChildren()) {
+ String queryString = getQueryString(child);
+ if (queryString != null) {
+ return queryString;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ */
+ @Override
+ public List<FineGrainBlocklet> prune(FilterResolverIntf filterExp,
+ SegmentProperties segmentProperties, List<String> partitions) throws IOException {
+
+ // convert filter expr into lucene list query
+ List<String> fields = new ArrayList<String>();
+
+ // only for test , query all data
+ String strQuery = getQueryString(filterExp.getFilterExpression());
+
+ String[] sFields = new String[fields.size()];
+ fields.toArray(sFields);
+
+ // get analyzer
+ if (analyzer == null) {
+ analyzer = new StandardAnalyzer();
+ }
+
+ // use MultiFieldQueryParser to parser query
+ QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer);
+ Query query;
+ try {
+ query = queryParser.parse(strQuery);
+ } catch (ParseException e) {
+ String errorMessage = String.format(
+ "failed to filter block with query %s, detail is %s", strQuery, e.getMessage());
+ LOGGER.error(errorMessage);
+ return null;
+ }
+
+ // execute index search
+ TopDocs result;
+ try {
+ result = indexSearcher.search(query, MAX_RESULT_NUMBER);
+ } catch (IOException e) {
+ String errorMessage =
+ String.format("failed to search lucene data, detail is %s", e.getMessage());
+ LOGGER.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ // temporary data, delete duplicated data
+ // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+ Map<String, Map<String, Map<Integer, Set<Integer>>>> mapBlocks = new HashMap<>();
+
+ for (ScoreDoc scoreDoc : result.scoreDocs) {
+ // get a document
+ Document doc = indexSearcher.doc(scoreDoc.doc);
+
+ // get all fields
+ List<IndexableField> fieldsInDoc = doc.getFields();
+
+ // get this block id Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+ String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue();
+ Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlocks.get(blockId);
+ if (mapBlocklets == null) {
+ mapBlocklets = new HashMap<>();
+ mapBlocks.put(blockId, mapBlocklets);
+ }
+
+ // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>>
+ String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue();
+ Map<Integer, Set<Integer>> mapPageIds = mapBlocklets.get(blockletId);
+ if (mapPageIds == null) {
+ mapPageIds = new HashMap<>();
+ mapBlocklets.put(blockletId, mapPageIds);
+ }
+
+ // get the page id Map<PageId, Set<RowId>>
+ Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue();
+ Set<Integer> setRowId = mapPageIds.get(pageId.intValue());
+ if (setRowId == null) {
+ setRowId = new HashSet<>();
+ mapPageIds.put(pageId.intValue(), setRowId);
+ }
+
+ // get the row id Set<RowId>
+ Number rowId = fieldsInDoc.get(ROWID_ID).numericValue();
+ setRowId.add(rowId.intValue());
+ }
+
+ // result blocklets
+ List<FineGrainBlocklet> blocklets = new ArrayList<>();
+
+ // transform all blocks into result type blocklets
+ // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+ for (Map.Entry<String, Map<String, Map<Integer, Set<Integer>>>> mapBlock :
+ mapBlocks.entrySet()) {
+ String blockId = mapBlock.getKey();
+ Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlock.getValue();
+ // for blocklets in this block Map<BlockletId, Map<PageId, Set<RowId>>>
+ for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet : mapBlocklets.entrySet()) {
+ String blockletId = mapBlocklet.getKey();
+ Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue();
+ List<FineGrainBlocklet.Page> pages = new ArrayList<FineGrainBlocklet.Page>();
+
+ // for pages in this blocklet Map<PageId, Set<RowId>>>
+ for (Map.Entry<Integer, Set<Integer>> mapPageId : mapPageIds.entrySet()) {
+ // construct array rowid
+ int[] rowIds = new int[mapPageId.getValue().size()];
+ int i = 0;
+ // for rowids in this page Set<RowId>
+ for (Integer rowid : mapPageId.getValue()) {
+ rowIds[i++] = rowid;
+ }
+ // construct one page
+ FineGrainBlocklet.Page page = new FineGrainBlocklet.Page();
+ page.setPageId(mapPageId.getKey());
+ page.setRowId(rowIds);
+
+ // add this page into list pages
+ pages.add(page);
+ }
+
+ // add a FineGrainBlocklet
+ blocklets.add(new FineGrainBlocklet(blockId, blockletId, pages));
+ }
+ }
+
+ return blocklets;
+ }
+
+ @Override
+ public boolean isScanRequired(FilterResolverIntf filterExp) {
+ return true;
+ }
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ @Override
+ public void clear() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
new file mode 100644
index 0000000..e35d5bf
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * CG level of lucene DataMap
+ */
+@InterfaceAudience.Internal
+public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<FineGrainDataMap> {
+
+ /**
+ * Get the datamap for segmentid
+ */
+ public List<FineGrainDataMap> getDataMaps(String segmentId) throws IOException {
+ List<FineGrainDataMap> lstDataMap = new ArrayList<>();
+ FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
+ try {
+ dataMap.init(new DataMapModel(
+ LuceneDataMapWriter.genDataMapStorePath(
+ tableIdentifier.getTablePath(), segmentId, dataMapName)));
+ } catch (MemoryException e) {
+ LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
+ return lstDataMap;
+ }
+ lstDataMap.add(dataMap);
+ return lstDataMap;
+ }
+
+ /**
+ * Get datamaps for distributable object.
+ */
+ public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable)
+ throws IOException {
+ return getDataMaps(distributable.getSegmentId());
+ }
+
+ @Override
+ public DataMapLevel getDataMapType() {
+ return DataMapLevel.FG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
new file mode 100644
index 0000000..a461f04
--- /dev/null
+++ b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+ val file2 = resourcesPath + "/datamap_input.csv"
+
+ override protected def beforeAll(): Unit = {
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 15000
+ LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql(
+ """
+ | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+ }
+
+ test("test lucene coarse grain data map") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test
+ | USING '${classOf[LuceneCoarseGrainDataMapFactory].getName}'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+
+ checkAnswer(sql("select * from datamap_test where name='n502670'"),
+ sql("select * from normal_test where name='n502670'"))
+ }
+
+ override protected def afterAll(): Unit = {
+ LuceneFineGrainDataMapSuite.deleteFile(file2)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql("DROP TABLE IF EXISTS datamap_test")
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
new file mode 100644
index 0000000..4766281
--- /dev/null
+++ b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.metadata.CarbonMetadata
+
+class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+ val file2 = resourcesPath + "/datamap_input.csv"
+
+ override protected def beforeAll(): Unit = {
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 15000
+ LuceneFineGrainDataMapSuite.createFile(file2)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql(
+ """
+ | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+ }
+
+ test("test lucene fine grain data map") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test
+ | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+
+ sql("SELECT * FROM datamap_test ORDER BY id").show
+
+ // sql("select * from normal_test where name='n34000'").show
+ sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')").show
+ sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')").show
+ sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')").show
+
+ // checkAnswer(
+ // sql("select * from datamap_test where match('name:n34000')"),
+ // sql("select * from normal_test where name='n34000'"))
+ }
+
+ override protected def afterAll(): Unit = {
+ LuceneFineGrainDataMapSuite.deleteFile(file2)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql("DROP TABLE IF EXISTS datamap_test")
+ }
+}
+
+object LuceneFineGrainDataMapSuite {
+ def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+ val write = new PrintWriter(new File(fileName))
+ for (i <- start until (start + line)) {
+ write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80))
+ }
+ write.close()
+ }
+
+ def deleteFile(fileName: String): Unit = {
+ val file = new File(fileName)
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index d1c04ae..b7f19fd 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -99,6 +99,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-lucene</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 84384b7..dd526f4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -246,7 +246,7 @@ class FGDataMap extends FineGrainDataMap {
* Clear complete index table and release memory.
*/
override def clear():Unit = {
- ???
+
}
override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
index 10955a3..ea571d7 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.datamap;
+import java.io.IOException;
+
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -62,7 +64,7 @@ public interface DataMapProvider {
* Implementation should initialize metadata for datamap, like creating table
*/
void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
- SparkSession sparkSession) throws MalformedDataMapCommandException;
+ SparkSession sparkSession) throws MalformedDataMapCommandException, IOException;
/**
* Initialize a datamap's data.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index 2a6a70a..c7651bb 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.datamap;
+import java.io.IOException;
+
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.MetadataProcessException;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -37,7 +39,7 @@ public class IndexDataMapProvider implements DataMapProvider {
@Override
public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
- SparkSession sparkSession) throws MalformedDataMapCommandException {
+ SparkSession sparkSession) throws MalformedDataMapCommandException, IOException {
DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
DataMapStoreManager.getInstance().registerDataMap(
mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java
new file mode 100644
index 0000000..fceb729
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.expression;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+@InterfaceAudience.Internal
+public class MatchExpression extends Expression {
+ private String queryString;
+
+ public MatchExpression(String queryString) {
+ this.queryString = queryString;
+ }
+
+ @Override
+ public ExpressionResult evaluate(RowIntf value)
+ throws FilterUnsupportedException, FilterIllegalMemberException {
+ return null;
+ }
+
+ @Override
+ public ExpressionType getFilterExpressionType() {
+ return ExpressionType.TEXT_MATCH;
+ }
+
+ @Override
+ public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+
+ }
+
+ @Override
+ public String getString() {
+ return queryString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala
new file mode 100644
index 0000000..093e479
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap
+
+import org.apache.spark.sql.sources.Filter
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+
+@InterfaceAudience.Internal
+class TextMatchUDF extends ((String) => Boolean) with Serializable {
+ override def apply(v1: String): Boolean = {
+ v1.length > 0
+ }
+}
+
+@InterfaceAudience.Internal
+case class TextMatch(queryString: String) extends Filter {
+ override def references: Array[String] = null
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 5fb3d56..9fcd28b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util._
+import org.apache.carbondata.datamap.TextMatchUDF
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.spark.rdd.SparkReadSupport
@@ -66,6 +67,10 @@ class CarbonEnv {
// column to sum and count.
sparkSession.udf.register("preAggLoad", () => "")
+ // register for lucene datamap
+ // TODO: move it to proper place, it should be registered by datamap implementation
+ sparkSession.udf.register("text_match", new TextMatchUDF)
+
// added for handling timeseries function like hour, minute, day , month , year
sparkSession.udf.register("timeseries", new TimeSeriesFunction)
synchronized {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4b2bfa9..2f640cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -35,11 +35,13 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.BucketingInfo
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.datamap.{TextMatch, TextMatchUDF}
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -523,6 +525,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
*/
protected[sql] def translateFilter(predicate: Expression, or: Boolean = false): Option[Filter] = {
predicate match {
+ case u: ScalaUDF if u.function.isInstanceOf[TextMatchUDF] =>
+ if (u.children.size > 1) {
+ throw new MalformedCarbonCommandException(
+ "TEXT_MATCH UDF syntax: TEXT_MATCH('luceneQuerySyntax')")
+ }
+ Some(TextMatch(u.children.head.toString()))
+
case or@Or(left, right) =>
val leftFilter = translateFilter(left, true)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index c062cfb..936f521 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.datamap.TextMatch
+import org.apache.carbondata.datamap.expression.MatchExpression
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -133,6 +135,8 @@ object CarbonFilters {
}, ExpressionType.CONTAINSWITH))
case CastExpr(expr: Expression) =>
Some(transformExpression(expr))
+ case TextMatch(queryString) =>
+ Some(new MatchExpression(queryString))
case _ => None
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0891972..fdc9210 100644
--- a/pom.xml
+++ b/pom.xml
@@ -342,6 +342,7 @@
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
<configuration>
+ <skip>true</skip>
<excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile>
<failOnError>true</failOnError>
<findbugsXmlOutput>true</findbugsXmlOutput>
@@ -481,6 +482,7 @@
<module>streaming</module>
<module>examples/spark2</module>
<module>datamap/examples</module>
+ <module>datamap/lucene</module>
</modules>
<build>
<plugins>
@@ -535,6 +537,7 @@
<module>integration/presto</module>
<module>streaming</module>
<module>examples/spark2</module>
+ <module>datamap/lucene</module>
</modules>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index d9ffd03..7a070a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -84,7 +84,7 @@ public class DataMapWriterListener {
LOG.info("DataMapWriter " + writer + " added");
}
- public void onBlockStart(String blockId, String blockPath) {
+ public void onBlockStart(String blockId, String blockPath) throws IOException {
for (List<DataMapWriter> writers : registry.values()) {
for (DataMapWriter writer : writers) {
writer.onBlockStart(blockId);
@@ -92,7 +92,7 @@ public class DataMapWriterListener {
}
}
- public void onBlockEnd(String blockId) {
+ public void onBlockEnd(String blockId) throws IOException {
for (List<DataMapWriter> writers : registry.values()) {
for (DataMapWriter writer : writers) {
writer.onBlockEnd(blockId);
@@ -122,7 +122,7 @@ public class DataMapWriterListener {
* @param pageId sequence number of page, start from 0
* @param tablePage page data
*/
- public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
+ public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException {
Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
List<String> indexedColumns = entry.getKey();