You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:06 UTC
[6/7] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
[CARBONDATA-1232] Datamap implementation for Blocklet
This closes #1099
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6812449
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6812449
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6812449
Branch: refs/heads/master
Commit: b6812449a4040dc5d3454cd0d6dd38f07be2854c
Parents: 2dbfab6
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jun 17 22:53:57 2017 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Thu Jul 27 18:47:52 2017 +0800
----------------------------------------------------------------------
.../carbondata/core/cache/CacheProvider.java | 3 +
.../apache/carbondata/core/cache/CacheType.java | 6 +
.../core/datastore/block/TableBlockInfo.java | 19 +
.../core/datastore/block/TaskBlockInfo.java | 4 +
.../carbondata/core/indexstore/Blocklet.java | 55 +-
.../indexstore/BlockletDataMapIndexStore.java | 180 ++++++
.../core/indexstore/BlockletDetailInfo.java | 117 ++++
.../carbondata/core/indexstore/DataMap.java | 8 +-
.../core/indexstore/DataMapFactory.java | 87 +++
.../core/indexstore/DataMapStoreManager.java | 90 ++-
.../carbondata/core/indexstore/DataMapType.java | 14 +-
.../TableBlockIndexUniqueIdentifier.java | 103 ++++
.../core/indexstore/TableDataMap.java | 97 +++-
.../core/indexstore/UnsafeMemoryDMStore.java | 207 +++++++
.../blockletindex/BlockletDMComparator.java | 134 +++++
.../blockletindex/BlockletDataMap.java | 445 +++++++++++++++
.../blockletindex/BlockletDataMapFactory.java | 115 ++++
.../BlockletDataRefNodeWrapper.java | 137 +++++
.../indexstore/blockletindex/IndexWrapper.java | 49 ++
.../core/indexstore/row/DataMapRow.java | 89 +++
.../core/indexstore/row/DataMapRowImpl.java | 106 ++++
.../core/indexstore/row/UnsafeDataMapRow.java | 133 +++++
.../core/indexstore/schema/DataMapSchema.java | 124 ++++
.../core/indexstore/schema/FilterType.java | 24 +
.../core/metadata/blocklet/BlockletInfo.java | 53 +-
.../core/metadata/index/BlockIndexInfo.java | 27 +
.../executor/impl/AbstractQueryExecutor.java | 52 +-
.../executer/IncludeFilterExecuterImpl.java | 2 +-
.../executer/RangeValueFilterExecuterImpl.java | 2 +-
.../RowLevelRangeGrtThanFiterExecuterImpl.java | 2 +-
...elRangeGrtrThanEquaToFilterExecuterImpl.java | 2 +-
...velRangeLessThanEqualFilterExecuterImpl.java | 2 +-
.../RowLevelRangeLessThanFiterExecuterImpl.java | 2 +-
.../processor/AbstractDataBlockIterator.java | 3 +
.../AbstractDetailQueryResultIterator.java | 34 +-
.../util/AbstractDataFileFooterConverter.java | 53 ++
.../apache/carbondata/core/util/CarbonUtil.java | 40 +-
.../core/util/DataFileFooterConverter.java | 4 +
.../core/util/DataFileFooterConverter2.java | 3 +
.../core/util/DataFileFooterConverterV3.java | 11 +
format/src/main/thrift/carbondata_index.thrift | 1 +
.../carbondata/hadoop/CarbonInputFormat.java | 14 +-
.../carbondata/hadoop/CarbonInputSplit.java | 39 +-
.../hadoop/api/CarbonTableInputFormat.java | 562 ++++++++++++++++---
.../hadoop/util/CarbonInputFormatUtil.java | 7 +-
.../presto/impl/CarbonTableReader.java | 56 +-
.../spark/rdd/CarbonIUDMergerRDD.scala | 5 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 9 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 22 +-
.../carbondata/spark/util/QueryPlanUtil.scala | 10 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 14 +-
.../sql/execution/command/IUDCommands.scala | 7 -
.../carbondata/spark/util/QueryPlanUtil.scala | 10 +-
.../processing/merger/CarbonCompactionUtil.java | 32 ++
54 files changed, 3167 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 25a8976..5c4b265 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexStore;
import org.apache.carbondata.core.util.CarbonProperties;
/**
@@ -126,6 +127,8 @@ public class CacheProvider {
} else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
cacheObject =
new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache);
+ } else if (cacheType.equals(cacheType.DRIVER_BLOCKLET_DATAMAP)) {
+ cacheObject = new BlockletDataMapIndexStore(carbonStorePath, carbonLRUCache);
}
cacheTypeToCacheMap.put(cacheType, cacheObject);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
index 2d6570d..ab51ff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
@@ -56,6 +56,12 @@ public class CacheType<K, V> {
DRIVER_BTREE = new CacheType("driver_btree");
/**
+ * Executor BTree cache which maintains size of BTree metadata
+ */
+ public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+ DRIVER_BLOCKLET_DATAMAP = new CacheType("driver_blocklet_datamap");
+
+ /**
* cacheName which is unique name for a cache
*/
private String cacheName;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 1da6699..316e202 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -78,6 +79,8 @@ public class TableBlockInfo implements Distributable, Serializable {
*/
private String[] deletedDeltaFilePath;
+ private BlockletDetailInfo detailInfo;
+
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -89,6 +92,10 @@ public class TableBlockInfo implements Distributable, Serializable {
this.deletedDeltaFilePath = deletedDeltaFilePath;
}
+ public TableBlockInfo() {
+
+ }
+
/**
* constructor to initialize the TbaleBlockInfo with BlockletInfos
*
@@ -322,4 +329,16 @@ public class TableBlockInfo implements Distributable, Serializable {
public String[] getDeletedDeltaFilePath() {
return deletedDeltaFilePath;
}
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
index eb707c2..4fcec87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datastore.block;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,9 @@ public class TaskBlockInfo {
return taskBlockInfoMapping.keySet();
}
+ public Collection<List<TableBlockInfo>> getAllTableBlockInfoList() {
+ return taskBlockInfoMapping.values();
+ }
/**
* returns TableBlockInfoList of given task
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 597c46c..66da4d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,27 +16,76 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.io.IOException;
import java.io.Serializable;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
/**
* Blocklet
*/
public class Blocklet implements Serializable {
- private String path;
+ private Path path;
+
+ private String segmentId;
private String blockletId;
+ private BlockletDetailInfo detailInfo;
+
+ private long length;
+
+ private String[] location;
+
public Blocklet(String path, String blockletId) {
- this.path = path;
+ this.path = new Path(path);
this.blockletId = blockletId;
}
- public String getPath() {
+ public Path getPath() {
return path;
}
public String getBlockletId() {
return blockletId;
}
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
+
+ public void updateLocations() throws IOException {
+ FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ LocatedFileStatus fileStatus = iter.next();
+ location = fileStatus.getBlockLocations()[0].getHosts();
+ length = fileStatus.getLen();
+ }
+
+ public String[] getLocations() throws IOException {
+ return location;
+ }
+
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
new file mode 100644
index 0000000..fc8c273
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class BlockletDataMapIndexStore
+ implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
+ /**
+ * carbon store path
+ */
+ protected String carbonStorePath;
+ /**
+ * CarbonLRU cache
+ */
+ protected CarbonLRUCache lruCache;
+
+ /**
+ * map of block info to lock object map, while loading the btree this will be filled
+ * and removed after loading the tree for that particular block info, this will be useful
+ * while loading the tree concurrently so only block level lock will be applied another
+ * block can be loaded concurrently
+ */
+ private Map<String, Object> segmentLockMap;
+
+ /**
+ * constructor to initialize the SegmentTaskIndexStore
+ *
+ * @param carbonStorePath
+ * @param lruCache
+ */
+ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+ this.carbonStorePath = carbonStorePath;
+ this.lruCache = lruCache;
+ segmentLockMap = new ConcurrentHashMap<String, Object>();
+ }
+
+ @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+ throws IOException {
+ String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
+ if (dataMap == null) {
+ try {
+ dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
+ } catch (IndexBuilderException e) {
+ throw new IOException(e.getMessage(), e);
+ } catch (Throwable e) {
+ throw new IOException("Problem in loading segment block.", e);
+ }
+ }
+ return dataMap;
+ }
+
+ @Override public List<BlockletDataMap> getAll(
+ List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
+ List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+ try {
+ for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
+ blockletDataMaps.add(get(identifier));
+ }
+ } catch (Throwable e) {
+ for (BlockletDataMap dataMap : blockletDataMaps) {
+ dataMap.clear();
+ }
+ throw new IOException("Problem in loading segment blocks.", e);
+ }
+ return blockletDataMaps;
+ }
+
+ /**
+ * returns the SegmentTaskIndexWrapper
+ *
+ * @param tableSegmentUniqueIdentifier
+ * @return
+ */
+ @Override public BlockletDataMap getIfPresent(
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+ BlockletDataMap dataMap = (BlockletDataMap) lruCache
+ .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ return dataMap;
+ }
+
+ /**
+ * method invalidate the segment cache for segment
+ *
+ * @param tableSegmentUniqueIdentifier
+ */
+ @Override public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+ lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ }
+
+ /**
+ * Below method will be used to load the segment of segments
+ * One segment may have multiple task , so table segment will be loaded
+ * based on task id and will return the map of taksId to table segment
+ * map
+ *
+ * @return map of taks id to segment mapping
+ * @throws IOException
+ */
+ private BlockletDataMap loadAndGetDataMap(
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+ String uniqueTableSegmentIdentifier =
+ tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+ if (lock == null) {
+ lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+ }
+ BlockletDataMap dataMap = null;
+ synchronized (lock) {
+ dataMap = new BlockletDataMap();
+ dataMap.init(tableSegmentUniqueIdentifier.getFilePath());
+ lruCache.put(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(), dataMap,
+ dataMap.getMemorySize());
+ }
+ return dataMap;
+ }
+
+ /**
+ * Below method will be used to get the segment level lock object
+ *
+ * @param uniqueIdentifier
+ * @return lock object
+ */
+ private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
+ // get the segment lock object if it is present then return
+ // otherwise add the new lock and return
+ Object segmentLoderLockObject = segmentLockMap.get(uniqueIdentifier);
+ if (null == segmentLoderLockObject) {
+ segmentLoderLockObject = new Object();
+ segmentLockMap.put(uniqueIdentifier, segmentLoderLockObject);
+ }
+ return segmentLoderLockObject;
+ }
+
+ /**
+ * The method clears the access count of table segments
+ *
+ * @param tableSegmentUniqueIdentifiers
+ */
+ @Override public void clearAccessCount(
+ List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) {
+ for (TableBlockIndexUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
+ BlockletDataMap cacheable =
+ (BlockletDataMap) lruCache.get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ cacheable.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
new file mode 100644
index 0000000..68dedd8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Blocklet detail information to be sent to each executor
+ */
+public class BlockletDetailInfo implements Serializable, Writable {
+
+ private int rowCount;
+
+ private short pagesCount;
+
+ private short versionNumber;
+
+ private int[] dimLens;
+
+ private long schemaUpdatedTimeStamp;
+
+ private BlockletInfo blockletInfo;
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(int rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public int getPagesCount() {
+ return pagesCount;
+ }
+
+ public void setPagesCount(short pagesCount) {
+ this.pagesCount = pagesCount;
+ }
+
+ public short getVersionNumber() {
+ return versionNumber;
+ }
+
+ public void setVersionNumber(short versionNumber) {
+ this.versionNumber = versionNumber;
+ }
+
+ public BlockletInfo getBlockletInfo() {
+ return blockletInfo;
+ }
+
+ public void setBlockletInfo(BlockletInfo blockletInfo) {
+ this.blockletInfo = blockletInfo;
+ }
+
+ public int[] getDimLens() {
+ return dimLens;
+ }
+
+ public void setDimLens(int[] dimLens) {
+ this.dimLens = dimLens;
+ }
+
+ public long getSchemaUpdatedTimeStamp() {
+ return schemaUpdatedTimeStamp;
+ }
+
+ public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
+ this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+ }
+
+ @Override public void write(DataOutput out) throws IOException {
+ out.writeInt(rowCount);
+ out.writeShort(pagesCount);
+ out.writeShort(versionNumber);
+ out.writeShort(dimLens.length);
+ for (int i = 0; i < dimLens.length; i++) {
+ out.writeInt(dimLens[i]);
+ }
+ out.writeLong(schemaUpdatedTimeStamp);
+ blockletInfo.write(out);
+ }
+
+ @Override public void readFields(DataInput in) throws IOException {
+ rowCount = in.readInt();
+ pagesCount = in.readShort();
+ versionNumber = in.readShort();
+ dimLens = new int[in.readShort()];
+ for (int i = 0; i < dimLens.length; i++) {
+ dimLens[i] = in.readInt();
+ }
+ schemaUpdatedTimeStamp = in.readLong();
+ blockletInfo = new BlockletInfo();
+ blockletInfo.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
index 2651f15..1276494 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
- * Interface for adding and retrieving index data.
+ * Datamap is an entity which can store and retrieve index data.
*/
public interface DataMap {
@@ -47,6 +47,12 @@ public interface DataMap {
List<Blocklet> prune(FilterResolverIntf filterExp);
/**
+ * Convert datamap to distributable object
+ * @return
+ */
+ DataMapDistributable toDistributable();
+
+ /**
* Clear complete index table and release memory.
*/
void clear();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
new file mode 100644
index 0000000..72f714f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.util.List;
+
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+ /**
+ * Initialization of Datamap factory
+ * @param identifier
+ * @param dataMapName
+ */
+ void init(AbsoluteTableIdentifier identifier, String dataMapName);
+ /**
+ * Get the datamap writer for each segmentid.
+ *
+ * @param identifier
+ * @param segmentId
+ * @return
+ */
+ DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
+ String segmentId);
+
+ /**
+ * Get the datamap for segmentid
+ *
+ * @param segmentId
+ * @return
+ */
+ List<DataMap> getDataMaps(String segmentId);
+
+ /**
+ * Get datamap for distributable object.
+ *
+ * @param distributable
+ * @return
+ */
+ DataMap getDataMap(DataMapDistributable distributable);
+
+ /**
+ * This method checks whether the columns and the type of filters supported
+ * for this datamap or not
+ *
+ * @param filterType
+ * @return
+ */
+ boolean isFiltersSupported(FilterType filterType);
+
+ /**
+ *
+ * @param event
+ */
+ void fireEvent(ChangeEvent event);
+
+ /**
+ * Clears datamap of the segment
+ */
+ void clear(String segmentId);
+
+ /**
+ * Clear all datamaps from memory
+ */
+ void clear();
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
index 64c6e20..1a36187 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
@@ -16,7 +16,9 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
@@ -24,13 +26,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
/**
- * It maintains all the index tables in it.
+ * It maintains all the DataMaps in it.
*/
-public class DataMapStoreManager {
+public final class DataMapStoreManager {
private static DataMapStoreManager instance = new DataMapStoreManager();
- private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>();
+ /**
+ * Contains the list of datamaps for each table.
+ */
+ private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
private static final LogService LOGGER =
LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
@@ -48,56 +53,85 @@ public class DataMapStoreManager {
*/
public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
DataMapType mapType) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- TableDataMap dataMap = null;
- if (map == null) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ TableDataMap dataMap;
+ if (tableDataMaps == null) {
+ createTableDataMap(identifier, mapType, dataMapName);
+ tableDataMaps = dataMapMappping.get(identifier);
+ }
+ dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+ if (dataMap == null) {
throw new RuntimeException("Datamap does not exist");
- } else {
- dataMap = map.get(dataMapName);
- if (dataMap == null) {
- throw new RuntimeException("Datamap does not exist");
- }
}
- // Initialize datamap
- dataMap.init(identifier, dataMapName);
return dataMap;
}
/**
- * Create new datamap instance using datamap type and path
+ * Create new datamap instance using datamap name, datamap type and table identifier
*
* @param mapType
* @return
*/
- public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataMapType mapType,
- String dataMapName) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- if (map == null) {
- map = new HashMap<>();
- dataMapMappping.put(mapType, map);
+ private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
+ DataMapType mapType, String dataMapName) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ if (tableDataMaps == null) {
+ tableDataMaps = new ArrayList<>();
+ dataMapMappping.put(identifier, tableDataMaps);
}
- TableDataMap dataMap = map.get(dataMapName);
+ TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
if (dataMap != null) {
throw new RuntimeException("Already datamap exists in that path with type " + mapType);
}
try {
- //TODO create datamap using @mapType.getClassName())
+ DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
+ dataMapFactory.init(identifier, dataMapName);
+ dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
} catch (Exception e) {
LOGGER.error(e);
+ throw new RuntimeException(e);
+ }
+ tableDataMaps.add(dataMap);
+ return dataMap;
+ }
+
+ private TableDataMap getAbstractTableDataMap(String dataMapName,
+ List<TableDataMap> tableDataMaps) {
+ TableDataMap dataMap = null;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap.getDataMapName().equals(dataMapName)) {
+ dataMap = tableDataMap;
+ break;
+ }
}
- // TODO: Initialize a data map by calling init method on the data map
- map.put(dataMapName, dataMap);
return dataMap;
}
- public void clearDataMap(String dataMapName, DataMapType mapType) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- if (map != null && map.get(dataMapName) != null) {
- map.remove(dataMapName).clear();
+ /**
+ * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+ * @param identifier
+ * @param dataMapName
+ */
+ public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ if (tableDataMaps != null) {
+ int i = 0;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+ tableDataMap.clear(new ArrayList<String>());
+ tableDataMaps.remove(i);
+ break;
+ }
+ i++;
+ }
}
}
+ /**
+ * Returns the singleton instance
+ * @return
+ */
public static DataMapStoreManager getInstance() {
return instance;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
index b6a0f5b..0059b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
@@ -16,19 +16,21 @@
*/
package org.apache.carbondata.core.indexstore;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+
/**
* Datamap type
*/
public enum DataMapType {
- BLOCKLET("org.apache.carbondata.datamap.BlockletDataMap");
+ BLOCKLET(BlockletDataMapFactory.class);
- private String className;
+ private Class<? extends DataMapFactory> classObject;
- DataMapType(String className) {
- this.className = className;
+ DataMapType(Class<? extends DataMapFactory> classObject) {
+ this.classObject = classObject;
}
- public String getClassName() {
- return className;
+ public Class<? extends DataMapFactory> getClassObject() {
+ return classObject;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
new file mode 100644
index 0000000..7e2bc0e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ */
+public class TableBlockIndexUniqueIdentifier {
+ /**
+ * table fully qualified identifier
+ */
+ private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+ private String segmentId;
+
+ private String carbonIndexFileName;
+
+ /**
+ * Constructor to initialize the class instance
+ *
+ * @param absoluteTableIdentifier
+ * @param segmentId
+ */
+ public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+ String segmentId, String carbonIndexFileName) {
+ this.absoluteTableIdentifier = absoluteTableIdentifier;
+ this.segmentId = segmentId;
+ this.carbonIndexFileName = carbonIndexFileName;
+ }
+
+ /**
+ * returns AbsoluteTableIdentifier
+ *
+ * @return
+ */
+ public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+ return absoluteTableIdentifier;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ /**
+ * method returns the id to uniquely identify a key
+ *
+ * @return
+ */
+ public String getUniqueTableSegmentIdentifier() {
+ CarbonTableIdentifier carbonTableIdentifier =
+ absoluteTableIdentifier.getCarbonTableIdentifier();
+ return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+ + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
+ + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId
+ + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName;
+ }
+
+ public String getFilePath() {
+ return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/"
+ + carbonIndexFileName;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o;
+
+ if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) {
+ return false;
+ }
+ if (!segmentId.equals(that.segmentId)) {
+ return false;
+ }
+ return carbonIndexFileName.equals(that.carbonIndexFileName);
+ }
+
+ @Override public int hashCode() {
+ int result = absoluteTableIdentifier.hashCode();
+ result = 31 * result + segmentId.hashCode();
+ result = 31 * result + carbonIndexFileName.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
index e1532c8..39ca4c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
@@ -16,38 +16,34 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.events.ChangeEvent;
import org.apache.carbondata.core.events.EventListener;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
/**
* DataMap at the table level, user can add any number of datamaps for one table. Depends
* on the filter condition it can prune the blocklets.
*/
-public interface TableDataMap extends EventListener {
+public final class TableDataMap implements EventListener {
- /**
- * It is called to initialize and load the required table datamap metadata.
- */
- void init(AbsoluteTableIdentifier identifier, String dataMapName);
+ private AbsoluteTableIdentifier identifier;
- /**
- * Gives the writer to write the metadata information of this datamap at table level.
- *
- * @return
- */
- DataMapWriter getWriter();
+ private String dataMapName;
+
+ private DataMapFactory dataMapFactory;
/**
- * Create the datamap using the segmentid and name.
- *
- * @param identifier
- * @param segmentId
- * @return
+ * It is called to initialize and load the required table datamap metadata.
*/
- DataMap createDataMap(AbsoluteTableIdentifier identifier, String segmentId);
+ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+ DataMapFactory dataMapFactory) {
+ this.identifier = identifier;
+ this.dataMapName = dataMapName;
+ this.dataMapFactory = dataMapFactory;
+ }
/**
* Pass the valid segments and prune the datamap using filter expression
@@ -56,7 +52,24 @@ public interface TableDataMap extends EventListener {
* @param filterExp
* @return
*/
- List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp);
+ public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
+ List<Blocklet> blocklets = new ArrayList<>();
+ for (String segmentId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ for (DataMap dataMap : dataMaps) {
+ List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+ blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+ }
+ }
+ return blocklets;
+ }
+
+ private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+ for (Blocklet blocklet : pruneBlocklets) {
+ blocklet.setSegmentId(segmentId);
+ }
+ return pruneBlocklets;
+ }
/**
* This is used for making the datamap distributable.
@@ -65,7 +78,16 @@ public interface TableDataMap extends EventListener {
*
* @return
*/
- List<DataMapDistributable> toDistributable(List<String> segmentIds);
+ public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
+ List<DataMapDistributable> distributables = new ArrayList<>();
+ for (String segmentsId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+ for (DataMap dataMap : dataMaps) {
+ distributables.add(dataMap.toDistributable());
+ }
+ }
+ return distributables;
+ }
/**
* This method is used from any machine after it is distributed. It takes the distributable object
@@ -75,20 +97,37 @@ public interface TableDataMap extends EventListener {
* @param filterExp
* @return
*/
- List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp);
+ public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+ return dataMapFactory.getDataMap(distributable).prune(filterExp);
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+ dataMapFactory.fireEvent(event);
+ }
/**
- * This method checks whether the columns and the type of filters supported
- * for this datamap or not
- *
- * @param filterExp
- * @return
+ * Clear only the datamaps of the segments
+ * @param segmentIds
*/
- boolean isFiltersSupported(FilterResolverIntf filterExp);
+ public void clear(List<String> segmentIds) {
+ for (String segmentId: segmentIds) {
+ dataMapFactory.clear(segmentId);
+ }
+ }
/**
- * Clears table level datamap
+ * Clears all datamap
+ */
+ public void clear() {
+ dataMapFactory.clear();
+ }
+ /**
+ * Get the unique name of datamap
+ *
+ * @return
*/
- void clear();
+ public String getDataMapName() {
+ return dataMapName;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
new file mode 100644
index 0000000..8246f99
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryAllocator;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to unsafe.
+ */
+public class UnsafeMemoryDMStore {
+
+ private MemoryBlock memoryBlock;
+
+ private static int capacity = 8 * 1024 * 1024;
+
+ private int allocatedSize;
+
+ private int runningLength;
+
+ private MemoryAllocator memoryAllocator;
+
+ private boolean isMemoryFreed;
+
+ private DataMapSchema[] schema;
+
+ private int[] pointers;
+
+ private int rowCount;
+
+ public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+ this.schema = schema;
+ this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
+ this.allocatedSize = capacity;
+ this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+ this.pointers = new int[1000];
+ }
+
+ /**
+ * Check memory is sufficient or not, if not sufficient allocate more memory and copy old data to
+ * new one.
+ *
+ * @param rowSize
+ */
+ private void ensureSize(int rowSize) {
+ if (runningLength + rowSize >= allocatedSize) {
+ MemoryBlock allocate =
+ MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+ unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+ allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+ memoryAllocator.free(memoryBlock);
+ allocatedSize = allocatedSize + capacity;
+ memoryBlock = allocate;
+ }
+ if (this.pointers.length <= rowCount + 1) {
+ int[] newPointer = new int[pointers.length + 1000];
+ System.arraycopy(pointers, 0, newPointer, 0, pointers.length);
+ this.pointers = newPointer;
+ }
+ }
+
+ /**
+ * Add the index row to unsafe.
+ *
+ * @param indexRow
+ * @return
+ */
+ public void addIndexRowToUnsafe(DataMapRow indexRow) {
+ // First calculate the required memory to keep the row in unsafe
+ int rowSize = indexRow.getTotalSizeInBytes();
+ // Check whether allocated memory is sufficient or not.
+ ensureSize(rowSize);
+ int pointer = runningLength;
+
+ for (int i = 0; i < schema.length; i++) {
+ addToUnsafe(schema[i], indexRow, i);
+ }
+ pointers[rowCount++] = pointer;
+ }
+
+ private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
+ switch (schema.getSchemaType()) {
+ case FIXED:
+ switch (schema.getDataType()) {
+ case BYTE:
+ unsafe.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getByte(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case SHORT:
+ unsafe
+ .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getShort(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case INT:
+ unsafe.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getInt(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case LONG:
+ unsafe.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getLong(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case FLOAT:
+ unsafe
+ .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getFloat(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case DOUBLE:
+ unsafe
+ .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getDouble(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case BYTE_ARRAY:
+ byte[] data = row.getByteArray(index);
+ unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ memoryBlock.getBaseOffset() + runningLength, data.length);
+ runningLength += row.getSizeInBytes(index);
+ break;
+ }
+ break;
+ case VARIABLE:
+ byte[] data = row.getByteArray(index);
+ unsafe.putShort(memoryBlock.getBaseOffset() + runningLength, (short) data.length);
+ runningLength += 2;
+ unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ memoryBlock.getBaseOffset() + runningLength, data.length);
+ runningLength += data.length;
+ break;
+ case STRUCT:
+ DataMapSchema[] childSchemas =
+ ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas();
+ DataMapRow struct = row.getRow(index);
+ for (int i = 0; i < childSchemas.length; i++) {
+ addToUnsafe(childSchemas[i], struct, i);
+ }
+ break;
+ }
+ }
+
+ public DataMapRow getUnsafeRow(int index) {
+ assert (index < rowCount);
+ return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
+ }
+
+ public void finishWriting() {
+ if (runningLength < allocatedSize) {
+ MemoryBlock allocate =
+ MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+ unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+ allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+ memoryAllocator.free(memoryBlock);
+ memoryBlock = allocate;
+ }
+ // Compact pointers.
+ if (rowCount < pointers.length) {
+ int[] newPointer = new int[rowCount];
+ System.arraycopy(pointers, 0, newPointer, 0, rowCount);
+ this.pointers = newPointer;
+ }
+ }
+
+ public void freeMemory() {
+ if (!isMemoryFreed) {
+ memoryAllocator.free(memoryBlock);
+ isMemoryFreed = true;
+ }
+ }
+
+ public int getMemoryUsed() {
+ return runningLength;
+ }
+
+ public DataMapSchema[] getSchema() {
+ return schema;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
new file mode 100644
index 0000000..9a50600
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Data map comparator
+ */
+public class BlockletDMComparator implements Comparator<DataMapRow> {
+
+ /**
+ * no dictionary column value is of variable length so in each column value
+ * it will -1
+ */
+ private static final int NO_DCITIONARY_COLUMN_VALUE = -1;
+
+ /**
+ * sized of the short value in bytes
+ */
+ private static final short SHORT_SIZE_IN_BYTES = 2;
+
+ private int[] eachColumnValueSize;
+
+ /**
+ * the number of no dictionary columns in SORT_COLUMNS
+ */
+ private int numberOfNoDictSortColumns;
+
+ /**
+ * the number of columns in SORT_COLUMNS
+ */
+ private int numberOfSortColumns;
+
+ public BlockletDMComparator(int[] eachColumnValueSize, int numberOfSortColumns,
+ int numberOfNoDictSortColumns) {
+ this.eachColumnValueSize = eachColumnValueSize;
+ this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+ this.numberOfSortColumns = numberOfSortColumns;
+ }
+
+ @Override public int compare(DataMapRow first, DataMapRow second) {
+ int dictionaryKeyOffset = 0;
+ int nonDictionaryKeyOffset = 0;
+ int compareResult = 0;
+ int processedNoDictionaryColumn = numberOfNoDictSortColumns;
+ byte[][] firstBytes = splitKey(first.getByteArray(0));
+ byte[][] secondBytes = splitKey(first.getByteArray(0));
+ byte[] firstNoDictionaryKeys = firstBytes[1];
+ ByteBuffer firstNoDictionaryKeyBuffer = ByteBuffer.wrap(firstNoDictionaryKeys);
+ byte[] secondNoDictionaryKeys = secondBytes[1];
+ ByteBuffer secondNoDictionaryKeyBuffer = ByteBuffer.wrap(secondNoDictionaryKeys);
+ int actualOffset = 0;
+ int actualOffset1 = 0;
+ int firstNoDcitionaryLength = 0;
+ int secondNodeDictionaryLength = 0;
+
+ for (int i = 0; i < numberOfSortColumns; i++) {
+
+ if (eachColumnValueSize[i] != NO_DCITIONARY_COLUMN_VALUE) {
+ byte[] firstDictionaryKeys = firstBytes[0];
+ byte[] secondDictionaryKeys = secondBytes[0];
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i],
+ secondDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i]);
+ dictionaryKeyOffset += eachColumnValueSize[i];
+ } else {
+ if (processedNoDictionaryColumn > 1) {
+ actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ firstNoDcitionaryLength =
+ firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+ - actualOffset;
+ actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ secondNodeDictionaryLength =
+ secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+ - actualOffset1;
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+ secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+ nonDictionaryKeyOffset += SHORT_SIZE_IN_BYTES;
+ processedNoDictionaryColumn--;
+ } else {
+ actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ firstNoDcitionaryLength = firstNoDictionaryKeys.length - actualOffset;
+ secondNodeDictionaryLength = secondNoDictionaryKeys.length - actualOffset1;
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+ secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+ }
+ }
+ if (compareResult != 0) {
+ return compareResult;
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Split the index key to dictionary and no dictionary.
+ * @param startKey
+ * @return
+ */
+ private byte[][] splitKey(byte[] startKey) {
+ ByteBuffer buffer = ByteBuffer.wrap(startKey);
+ buffer.rewind();
+ int dictonaryKeySize = buffer.getInt();
+ int nonDictonaryKeySize = buffer.getInt();
+ byte[] dictionaryKey = new byte[dictonaryKeySize];
+ buffer.get(dictionaryKey);
+ byte[] nonDictionaryKey = new byte[nonDictonaryKeySize];
+ buffer.get(nonDictionaryKey);
+ return new byte[][] {dictionaryKey, nonDictionaryKey};
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
new file mode 100644
index 0000000..79aa091
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletDataMap implements DataMap, Cacheable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+
+ private static int KEY_INDEX = 0;
+
+ private static int MIN_VALUES_INDEX = 1;
+
+ private static int MAX_VALUES_INDEX = 2;
+
+ private static int ROW_COUNT_INDEX = 3;
+
+ private static int FILE_PATH_INDEX = 4;
+
+ private static int PAGE_COUNT_INDEX = 5;
+
+ private static int VERSION_INDEX = 6;
+
+ private static int SCHEMA_UPADATED_TIME_INDEX = 7;
+
+ private static int BLOCK_INFO_INDEX = 8;
+
+ private UnsafeMemoryDMStore unsafeMemoryDMStore;
+
+ private SegmentProperties segmentProperties;
+
+ private int[] columnCardinality;
+
+ @Override public DataMapWriter getWriter() {
+ return null;
+ }
+
+ @Override public void init(String path) {
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ try {
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+ for (DataFileFooter fileFooter : indexInfo) {
+ List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+ if (segmentProperties == null) {
+ columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+ segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+ createSchema(segmentProperties);
+ }
+ TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+ fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+ loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+ }
+ if (unsafeMemoryDMStore != null) {
+ unsafeMemoryDMStore.finishWriting();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
+ String filePath) {
+ int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+ List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+ DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
+ for (int index = 0; index < blockletList.size(); index++) {
+ DataMapRow row = new DataMapRowImpl(schema);
+ int ordinal = 0;
+ BlockletInfo blockletInfo = blockletList.get(index);
+
+ // add start key as index key
+ row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
+
+ BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
+ ordinal++;
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
+ ordinal++;
+
+ row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
+
+ // add file path
+ byte[] filePathBytes = filePath.getBytes();
+ row.setByteArray(filePathBytes, ordinal++);
+
+ // add pages
+ row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+
+ // add version number
+ row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+ // add schema updated time
+ row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+ // add blocklet info
+ byte[] serializedData;
+ try {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(stream);
+ blockletInfo.write(dataOutput);
+ serializedData = stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ row.setByteArray(serializedData, ordinal);
+ unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ }
+ }
+
+ private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
+ DataMapSchema[] minSchemas =
+ ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
+ DataMapRow minRow = new DataMapRowImpl(minSchemas);
+ int minOrdinal = 0;
+ // min value adding
+ for (int i = 0; i < minMaxLen.length; i++) {
+ minRow.setByteArray(minValues[i], minOrdinal++);
+ }
+ return minRow;
+ }
+
+ private void createSchema(SegmentProperties segmentProperties) {
+ List<DataMapSchema> indexSchemas = new ArrayList<>();
+
+ // Index key
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+ int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+ // do it 2 times, one for min and one for max.
+ for (int k = 0; k < 2; k++) {
+ DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
+ for (int i = 0; i < minMaxLen.length; i++) {
+ if (minMaxLen[i] <= 0) {
+ mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY);
+ } else {
+ mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]);
+ }
+ }
+ DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas);
+ indexSchemas.add(mapSchema);
+ }
+
+ // for number of rows.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT));
+
+ // for table block path
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+ // for number of pages.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+ // for version number.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+ // for schema updated time.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG));
+
+ //for blocklet info
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+ unsafeMemoryDMStore =
+ new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
+ }
+
+ @Override public List<Blocklet> prune(FilterResolverIntf filterExp) {
+
+ // getting the start and end index key based on filter for hitting the
+ // selected block reference nodes based on filter resolver tree.
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("preparing the start and end key for finding"
+ + "start and end block as per filter resolver");
+ }
+ List<Blocklet> blocklets = new ArrayList<>();
+ Comparator<DataMapRow> comparator =
+ new BlockletDMComparator(segmentProperties.getEachDimColumnValueSize(),
+ segmentProperties.getNumberOfSortColumns(),
+ segmentProperties.getNumberOfNoDictSortColumns());
+ List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+ FilterUtil
+ .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys);
+ // reading the first value from list which has start key
+ IndexKey searchStartKey = listOfStartEndKeys.get(0);
+ // reading the last value from list which has end key
+ IndexKey searchEndKey = listOfStartEndKeys.get(1);
+ if (null == searchStartKey && null == searchEndKey) {
+ try {
+ // TODO need to handle for no dictionary dimensions
+ searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+ // TODO need to handle for no dictionary dimensions
+ searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+ } catch (KeyGenException e) {
+ return null;
+ }
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Successfully retrieved the start and end key" + "Dictionary Start Key: " + searchStartKey
+ .getDictionaryKeys() + "No Dictionary Start Key " + searchStartKey
+ .getNoDictionaryKeys() + "Dictionary End Key: " + searchEndKey.getDictionaryKeys()
+ + "No Dictionary End Key " + searchEndKey.getNoDictionaryKeys());
+ }
+ if (filterExp == null) {
+ int rowCount = unsafeMemoryDMStore.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i);
+ blocklets.add(createBlocklet(unsafeRow, i));
+ }
+ } else {
+ int startIndex = findStartIndex(convertToRow(searchStartKey), comparator);
+ int endIndex = findEndIndex(convertToRow(searchEndKey), comparator);
+ FilterExecuter filterExecuter =
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ while (startIndex <= endIndex) {
+ DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex);
+ BitSet bitSet = filterExecuter.isScanRequired(getMinMaxValue(unsafeRow, MAX_VALUES_INDEX),
+ getMinMaxValue(unsafeRow, MIN_VALUES_INDEX));
+ if (!bitSet.isEmpty()) {
+ blocklets.add(createBlocklet(unsafeRow, startIndex));
+ }
+ startIndex++;
+ }
+ }
+
+ return blocklets;
+ }
+
+ private byte[][] getMinMaxValue(DataMapRow row, int index) {
+ DataMapRow minMaxRow = row.getRow(index);
+ byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
+ for (int i = 0; i < minMax.length; i++) {
+ minMax[i] = minMaxRow.getByteArray(i);
+ }
+ return minMax;
+ }
+
+ private Blocklet createBlocklet(DataMapRow row, int blockletId) {
+ Blocklet blocklet =
+ new Blocklet(new String(row.getByteArray(FILE_PATH_INDEX)), blockletId + "");
+ BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+ detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
+ detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
+ detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+ detailInfo.setDimLens(columnCardinality);
+ detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
+ BlockletInfo blockletInfo = new BlockletInfo();
+ try {
+ byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+ DataInputStream inputStream = new DataInputStream(stream);
+ blockletInfo.readFields(inputStream);
+ inputStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ detailInfo.setBlockletInfo(blockletInfo);
+ blocklet.setDetailInfo(detailInfo);
+ return blocklet;
+ }
+
+ /**
+ * Binary search used to get the first tentative index row based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ // if key is matched then get the first entry
+ int currentPos = mid;
+ while (currentPos - 1 >= 0
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+ currentPos--;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ // get the leaf child
+ return childNodeIndex;
+ }
+
+ /**
+ * Binary search used to get the last tentative block based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ int currentPos = mid;
+ // if key is matched then get the first entry
+ while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+ currentPos++;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ return childNodeIndex;
+ }
+
+ private DataMapRow convertToRow(IndexKey key) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
+ buffer.putInt(key.getDictionaryKeys().length);
+ buffer.putInt(key.getNoDictionaryKeys().length);
+ buffer.put(key.getDictionaryKeys());
+ buffer.put(key.getNoDictionaryKeys());
+ DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+ dataMapRow.setByteArray(buffer.array(), 0);
+ return dataMapRow;
+ }
+
+ @Override public void clear() {
+ unsafeMemoryDMStore.freeMemory();
+ unsafeMemoryDMStore = null;
+ segmentProperties = null;
+ }
+
+ @Override public long getFileTimeStamp() {
+ return 0;
+ }
+
+ @Override public int getAccessCount() {
+ return 0;
+ }
+
+ @Override public long getMemorySize() {
+ return unsafeMemoryDMStore.getMemoryUsed();
+ }
+
+ @Override public DataMapDistributable toDistributable() {
+ // TODO
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
new file mode 100644
index 0000000..2fe6643
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapFactory;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Table map for blocklet
+ */
+public class BlockletDataMapFactory implements DataMapFactory {
+
+ private AbsoluteTableIdentifier identifier;
+
+ private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+
+ private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+
+ public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+ this.identifier = identifier;
+ cache = CacheProvider.getInstance()
+ .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
+ }
+
+ public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+ return null;
+ }
+
+ public List<DataMap> getDataMaps(String segmentId) {
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ segmentMap.get(segmentId);
+ if (tableBlockIndexUniqueIdentifiers == null) {
+ tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+ String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId;
+ FileFactory.FileType fileType = FileFactory.getFileType(path);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(".carbonindex");
+ }
+ });
+ for (int i = 0; i < listFiles.length; i++) {
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName()));
+ }
+ }
+
+ try {
+ return cache.getAll(tableBlockIndexUniqueIdentifiers);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override public boolean isFiltersSupported(FilterType filterType) {
+ return true;
+ }
+
+ public void clear(String segmentId) {
+ List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
+ if (blockIndexes != null) {
+ for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
+ DataMap dataMap = cache.getIfPresent(blockIndex);
+ dataMap.clear();
+ cache.invalidate(blockIndex);
+ }
+ }
+ }
+
+ @Override public void clear() {
+ for (String segmentId: segmentMap.keySet()) {
+ clear(segmentId);
+ }
+ }
+
+ @Override public DataMap getDataMap(DataMapDistributable distributable) {
+ return null;
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
new file mode 100644
index 0000000..5509c75
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+/**
+ * wrapper for blocklet data map data
+ */
+public class BlockletDataRefNodeWrapper implements DataRefNode {
+
+ private List<TableBlockInfo> blockInfos;
+
+ private int index;
+
+ private int[] dimensionLens;
+
+ private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
+
+ public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
+ int[] dimensionLens) {
+ this.blockInfos = blockInfos;
+ this.index = index;
+ this.dimensionLens = dimensionLens;
+ }
+
+ @Override public DataRefNode getNextDataRefNode() {
+ if (index + 1 < blockInfos.size()) {
+ new BlockletDataRefNodeWrapper(blockInfos, index + 1, dimensionLens);
+ }
+ return null;
+ }
+
+ @Override public int nodeSize() {
+ return blockInfos.get(index).getDetailInfo().getRowCount();
+ }
+
+ @Override public long nodeNumber() {
+ return index;
+ }
+
+ @Override public byte[][] getColumnsMaxValue() {
+ return null;
+ }
+
+ @Override public byte[][] getColumnsMinValue() {
+ return null;
+ }
+
+ @Override
+ public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+ return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes);
+ }
+
+ @Override
+ public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes)
+ throws IOException {
+ DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+ return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndexes);
+ }
+
+ @Override
+ public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+ return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes);
+ }
+
+ @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
+ throws IOException {
+ MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+ return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex);
+ }
+
+ private DimensionColumnChunkReader getDimensionColumnChunkReader() throws IOException {
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+ DimensionColumnChunkReader dimensionColumnChunkReader = CarbonDataReaderFactory.getInstance()
+ .getDimensionColumnChunkReader(version,
+ blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens,
+ blockInfos.get(index).getFilePath());
+ return dimensionColumnChunkReader;
+ }
+
+ private MeasureColumnChunkReader getMeasureColumnChunkReader() throws IOException {
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+ return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version,
+ blockInfos.get(index).getDetailInfo().getBlockletInfo(),
+ blockInfos.get(index).getFilePath());
+ }
+
+ @Override
+ public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) {
+ this.deleteDeltaDataCache = deleteDeltaDataCache;
+ }
+
+ @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+ return deleteDeltaDataCache;
+ }
+
+ @Override public int numberOfPages() {
+ return blockInfos.get(index).getDetailInfo().getPagesCount();
+ }
+
+ public int numberOfNodes() {
+ return blockInfos.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
new file mode 100644
index 0000000..b8cffc6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Wrapper of abstract index
+ * TODO it could be removed after refactor
+ */
+public class IndexWrapper extends AbstractIndex {
+
+ public IndexWrapper(List<TableBlockInfo> blockInfos) {
+ DataFileFooter fileFooter = null;
+ try {
+ fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
+ fileFooter.getSegmentInfo().getColumnCardinality());
+ dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0,
+ segmentProperties.getDimensionColumnsValueSize());
+ }
+
+ @Override public void buildIndex(List<DataFileFooter> footerList) {
+ }
+}