You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:01 UTC
[1/7] carbondata git commit: Optimizing decimal datatype
Repository: carbondata
Updated Branches:
refs/heads/master f2bb8d380 -> 79feac96a
Optimizing decimal datatype
Optimized big decimal to use less space
Fixed comments
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/92fe63cf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/92fe63cf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/92fe63cf
Branch: refs/heads/master
Commit: 92fe63cf3a1378c68623bb8f97b72bc9d2a75d94
Parents: f2bb8d3
Author: ravipesala <ra...@gmail.com>
Authored: Wed Jun 28 16:03:03 2017 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Thu Jul 27 18:47:51 2017 +0800
----------------------------------------------------------------------
.../carbondata/core/metadata/datatype/DecimalConverterFactory.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92fe63cf/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 459eb24..555df1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
*/
public final class DecimalConverterFactory {
- public static final DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
+ public static DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
private int[] minBytesForPrecision = minBytesForPrecision();
[6/7] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
Posted by ja...@apache.org.
[CARBONDATA-1232] Datamap implementation for Blocklet
This closes #1099
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6812449
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6812449
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6812449
Branch: refs/heads/master
Commit: b6812449a4040dc5d3454cd0d6dd38f07be2854c
Parents: 2dbfab6
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jun 17 22:53:57 2017 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Thu Jul 27 18:47:52 2017 +0800
----------------------------------------------------------------------
.../carbondata/core/cache/CacheProvider.java | 3 +
.../apache/carbondata/core/cache/CacheType.java | 6 +
.../core/datastore/block/TableBlockInfo.java | 19 +
.../core/datastore/block/TaskBlockInfo.java | 4 +
.../carbondata/core/indexstore/Blocklet.java | 55 +-
.../indexstore/BlockletDataMapIndexStore.java | 180 ++++++
.../core/indexstore/BlockletDetailInfo.java | 117 ++++
.../carbondata/core/indexstore/DataMap.java | 8 +-
.../core/indexstore/DataMapFactory.java | 87 +++
.../core/indexstore/DataMapStoreManager.java | 90 ++-
.../carbondata/core/indexstore/DataMapType.java | 14 +-
.../TableBlockIndexUniqueIdentifier.java | 103 ++++
.../core/indexstore/TableDataMap.java | 97 +++-
.../core/indexstore/UnsafeMemoryDMStore.java | 207 +++++++
.../blockletindex/BlockletDMComparator.java | 134 +++++
.../blockletindex/BlockletDataMap.java | 445 +++++++++++++++
.../blockletindex/BlockletDataMapFactory.java | 115 ++++
.../BlockletDataRefNodeWrapper.java | 137 +++++
.../indexstore/blockletindex/IndexWrapper.java | 49 ++
.../core/indexstore/row/DataMapRow.java | 89 +++
.../core/indexstore/row/DataMapRowImpl.java | 106 ++++
.../core/indexstore/row/UnsafeDataMapRow.java | 133 +++++
.../core/indexstore/schema/DataMapSchema.java | 124 ++++
.../core/indexstore/schema/FilterType.java | 24 +
.../core/metadata/blocklet/BlockletInfo.java | 53 +-
.../core/metadata/index/BlockIndexInfo.java | 27 +
.../executor/impl/AbstractQueryExecutor.java | 52 +-
.../executer/IncludeFilterExecuterImpl.java | 2 +-
.../executer/RangeValueFilterExecuterImpl.java | 2 +-
.../RowLevelRangeGrtThanFiterExecuterImpl.java | 2 +-
...elRangeGrtrThanEquaToFilterExecuterImpl.java | 2 +-
...velRangeLessThanEqualFilterExecuterImpl.java | 2 +-
.../RowLevelRangeLessThanFiterExecuterImpl.java | 2 +-
.../processor/AbstractDataBlockIterator.java | 3 +
.../AbstractDetailQueryResultIterator.java | 34 +-
.../util/AbstractDataFileFooterConverter.java | 53 ++
.../apache/carbondata/core/util/CarbonUtil.java | 40 +-
.../core/util/DataFileFooterConverter.java | 4 +
.../core/util/DataFileFooterConverter2.java | 3 +
.../core/util/DataFileFooterConverterV3.java | 11 +
format/src/main/thrift/carbondata_index.thrift | 1 +
.../carbondata/hadoop/CarbonInputFormat.java | 14 +-
.../carbondata/hadoop/CarbonInputSplit.java | 39 +-
.../hadoop/api/CarbonTableInputFormat.java | 562 ++++++++++++++++---
.../hadoop/util/CarbonInputFormatUtil.java | 7 +-
.../presto/impl/CarbonTableReader.java | 56 +-
.../spark/rdd/CarbonIUDMergerRDD.scala | 5 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 9 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 22 +-
.../carbondata/spark/util/QueryPlanUtil.scala | 10 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 14 +-
.../sql/execution/command/IUDCommands.scala | 7 -
.../carbondata/spark/util/QueryPlanUtil.scala | 10 +-
.../processing/merger/CarbonCompactionUtil.java | 32 ++
54 files changed, 3167 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 25a8976..5c4b265 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexStore;
import org.apache.carbondata.core.util.CarbonProperties;
/**
@@ -126,6 +127,8 @@ public class CacheProvider {
} else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
cacheObject =
new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache);
+ } else if (cacheType.equals(cacheType.DRIVER_BLOCKLET_DATAMAP)) {
+ cacheObject = new BlockletDataMapIndexStore(carbonStorePath, carbonLRUCache);
}
cacheTypeToCacheMap.put(cacheType, cacheObject);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
index 2d6570d..ab51ff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
@@ -56,6 +56,12 @@ public class CacheType<K, V> {
DRIVER_BTREE = new CacheType("driver_btree");
/**
+ * Executor BTree cache which maintains size of BTree metadata
+ */
+ public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+ DRIVER_BLOCKLET_DATAMAP = new CacheType("driver_blocklet_datamap");
+
+ /**
* cacheName which is unique name for a cache
*/
private String cacheName;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 1da6699..316e202 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -78,6 +79,8 @@ public class TableBlockInfo implements Distributable, Serializable {
*/
private String[] deletedDeltaFilePath;
+ private BlockletDetailInfo detailInfo;
+
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -89,6 +92,10 @@ public class TableBlockInfo implements Distributable, Serializable {
this.deletedDeltaFilePath = deletedDeltaFilePath;
}
+ public TableBlockInfo() {
+
+ }
+
/**
* constructor to initialize the TbaleBlockInfo with BlockletInfos
*
@@ -322,4 +329,16 @@ public class TableBlockInfo implements Distributable, Serializable {
public String[] getDeletedDeltaFilePath() {
return deletedDeltaFilePath;
}
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
index eb707c2..4fcec87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datastore.block;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,9 @@ public class TaskBlockInfo {
return taskBlockInfoMapping.keySet();
}
+ public Collection<List<TableBlockInfo>> getAllTableBlockInfoList() {
+ return taskBlockInfoMapping.values();
+ }
/**
* returns TableBlockInfoList of given task
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 597c46c..66da4d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,27 +16,76 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.io.IOException;
import java.io.Serializable;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
/**
* Blocklet
*/
public class Blocklet implements Serializable {
- private String path;
+ private Path path;
+
+ private String segmentId;
private String blockletId;
+ private BlockletDetailInfo detailInfo;
+
+ private long length;
+
+ private String[] location;
+
public Blocklet(String path, String blockletId) {
- this.path = path;
+ this.path = new Path(path);
this.blockletId = blockletId;
}
- public String getPath() {
+ public Path getPath() {
return path;
}
public String getBlockletId() {
return blockletId;
}
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
+
+ public void updateLocations() throws IOException {
+ FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ LocatedFileStatus fileStatus = iter.next();
+ location = fileStatus.getBlockLocations()[0].getHosts();
+ length = fileStatus.getLen();
+ }
+
+ public String[] getLocations() throws IOException {
+ return location;
+ }
+
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
new file mode 100644
index 0000000..fc8c273
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class BlockletDataMapIndexStore
+ implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
+ /**
+ * carbon store path
+ */
+ protected String carbonStorePath;
+ /**
+ * CarbonLRU cache
+ */
+ protected CarbonLRUCache lruCache;
+
+ /**
+ * map of block info to lock object map, while loading the btree this will be filled
+ * and removed after loading the tree for that particular block info, this will be useful
+ * while loading the tree concurrently so only block level lock will be applied another
+ * block can be loaded concurrently
+ */
+ private Map<String, Object> segmentLockMap;
+
+ /**
+ * constructor to initialize the SegmentTaskIndexStore
+ *
+ * @param carbonStorePath
+ * @param lruCache
+ */
+ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+ this.carbonStorePath = carbonStorePath;
+ this.lruCache = lruCache;
+ segmentLockMap = new ConcurrentHashMap<String, Object>();
+ }
+
+ @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+ throws IOException {
+ String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
+ if (dataMap == null) {
+ try {
+ dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
+ } catch (IndexBuilderException e) {
+ throw new IOException(e.getMessage(), e);
+ } catch (Throwable e) {
+ throw new IOException("Problem in loading segment block.", e);
+ }
+ }
+ return dataMap;
+ }
+
+ @Override public List<BlockletDataMap> getAll(
+ List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
+ List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+ try {
+ for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
+ blockletDataMaps.add(get(identifier));
+ }
+ } catch (Throwable e) {
+ for (BlockletDataMap dataMap : blockletDataMaps) {
+ dataMap.clear();
+ }
+ throw new IOException("Problem in loading segment blocks.", e);
+ }
+ return blockletDataMaps;
+ }
+
+ /**
+ * returns the SegmentTaskIndexWrapper
+ *
+ * @param tableSegmentUniqueIdentifier
+ * @return
+ */
+ @Override public BlockletDataMap getIfPresent(
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+ BlockletDataMap dataMap = (BlockletDataMap) lruCache
+ .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ return dataMap;
+ }
+
+ /**
+ * method invalidate the segment cache for segment
+ *
+ * @param tableSegmentUniqueIdentifier
+ */
+ @Override public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+ lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ }
+
+ /**
+ * Below method will be used to load the segment of segments
+ * One segment may have multiple task , so table segment will be loaded
+ * based on task id and will return the map of taksId to table segment
+ * map
+ *
+ * @return map of taks id to segment mapping
+ * @throws IOException
+ */
+ private BlockletDataMap loadAndGetDataMap(
+ TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+ String uniqueTableSegmentIdentifier =
+ tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+ if (lock == null) {
+ lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+ }
+ BlockletDataMap dataMap = null;
+ synchronized (lock) {
+ dataMap = new BlockletDataMap();
+ dataMap.init(tableSegmentUniqueIdentifier.getFilePath());
+ lruCache.put(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(), dataMap,
+ dataMap.getMemorySize());
+ }
+ return dataMap;
+ }
+
+ /**
+ * Below method will be used to get the segment level lock object
+ *
+ * @param uniqueIdentifier
+ * @return lock object
+ */
+ private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
+ // get the segment lock object if it is present then return
+ // otherwise add the new lock and return
+ Object segmentLoderLockObject = segmentLockMap.get(uniqueIdentifier);
+ if (null == segmentLoderLockObject) {
+ segmentLoderLockObject = new Object();
+ segmentLockMap.put(uniqueIdentifier, segmentLoderLockObject);
+ }
+ return segmentLoderLockObject;
+ }
+
+ /**
+ * The method clears the access count of table segments
+ *
+ * @param tableSegmentUniqueIdentifiers
+ */
+ @Override public void clearAccessCount(
+ List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) {
+ for (TableBlockIndexUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
+ BlockletDataMap cacheable =
+ (BlockletDataMap) lruCache.get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ cacheable.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
new file mode 100644
index 0000000..68dedd8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Blocklet detail information to be sent to each executor
+ */
+public class BlockletDetailInfo implements Serializable, Writable {
+
+ private int rowCount;
+
+ private short pagesCount;
+
+ private short versionNumber;
+
+ private int[] dimLens;
+
+ private long schemaUpdatedTimeStamp;
+
+ private BlockletInfo blockletInfo;
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(int rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public int getPagesCount() {
+ return pagesCount;
+ }
+
+ public void setPagesCount(short pagesCount) {
+ this.pagesCount = pagesCount;
+ }
+
+ public short getVersionNumber() {
+ return versionNumber;
+ }
+
+ public void setVersionNumber(short versionNumber) {
+ this.versionNumber = versionNumber;
+ }
+
+ public BlockletInfo getBlockletInfo() {
+ return blockletInfo;
+ }
+
+ public void setBlockletInfo(BlockletInfo blockletInfo) {
+ this.blockletInfo = blockletInfo;
+ }
+
+ public int[] getDimLens() {
+ return dimLens;
+ }
+
+ public void setDimLens(int[] dimLens) {
+ this.dimLens = dimLens;
+ }
+
+ public long getSchemaUpdatedTimeStamp() {
+ return schemaUpdatedTimeStamp;
+ }
+
+ public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
+ this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+ }
+
+ @Override public void write(DataOutput out) throws IOException {
+ out.writeInt(rowCount);
+ out.writeShort(pagesCount);
+ out.writeShort(versionNumber);
+ out.writeShort(dimLens.length);
+ for (int i = 0; i < dimLens.length; i++) {
+ out.writeInt(dimLens[i]);
+ }
+ out.writeLong(schemaUpdatedTimeStamp);
+ blockletInfo.write(out);
+ }
+
+ @Override public void readFields(DataInput in) throws IOException {
+ rowCount = in.readInt();
+ pagesCount = in.readShort();
+ versionNumber = in.readShort();
+ dimLens = new int[in.readShort()];
+ for (int i = 0; i < dimLens.length; i++) {
+ dimLens[i] = in.readInt();
+ }
+ schemaUpdatedTimeStamp = in.readLong();
+ blockletInfo = new BlockletInfo();
+ blockletInfo.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
index 2651f15..1276494 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
- * Interface for adding and retrieving index data.
+ * Datamap is an entity which can store and retrieve index data.
*/
public interface DataMap {
@@ -47,6 +47,12 @@ public interface DataMap {
List<Blocklet> prune(FilterResolverIntf filterExp);
/**
+ * Convert datamap to distributable object
+ * @return
+ */
+ DataMapDistributable toDistributable();
+
+ /**
* Clear complete index table and release memory.
*/
void clear();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
new file mode 100644
index 0000000..72f714f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.util.List;
+
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+ /**
+ * Initialization of Datamap factory
+ * @param identifier
+ * @param dataMapName
+ */
+ void init(AbsoluteTableIdentifier identifier, String dataMapName);
+ /**
+ * Get the datamap writer for each segmentid.
+ *
+ * @param identifier
+ * @param segmentId
+ * @return
+ */
+ DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
+ String segmentId);
+
+ /**
+ * Get the datamap for segmentid
+ *
+ * @param segmentId
+ * @return
+ */
+ List<DataMap> getDataMaps(String segmentId);
+
+ /**
+ * Get datamap for distributable object.
+ *
+ * @param distributable
+ * @return
+ */
+ DataMap getDataMap(DataMapDistributable distributable);
+
+ /**
+ * This method checks whether the columns and the type of filters supported
+ * for this datamap or not
+ *
+ * @param filterType
+ * @return
+ */
+ boolean isFiltersSupported(FilterType filterType);
+
+ /**
+ *
+ * @param event
+ */
+ void fireEvent(ChangeEvent event);
+
+ /**
+ * Clears datamap of the segment
+ */
+ void clear(String segmentId);
+
+ /**
+ * Clear all datamaps from memory
+ */
+ void clear();
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
index 64c6e20..1a36187 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
@@ -16,7 +16,9 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
@@ -24,13 +26,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
/**
- * It maintains all the index tables in it.
+ * It maintains all the DataMaps in it.
*/
-public class DataMapStoreManager {
+public final class DataMapStoreManager {
private static DataMapStoreManager instance = new DataMapStoreManager();
- private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>();
+ /**
+ * Contains the list of datamaps for each table.
+ */
+ private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
private static final LogService LOGGER =
LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
@@ -48,56 +53,85 @@ public class DataMapStoreManager {
*/
public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
DataMapType mapType) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- TableDataMap dataMap = null;
- if (map == null) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ TableDataMap dataMap;
+ if (tableDataMaps == null) {
+ createTableDataMap(identifier, mapType, dataMapName);
+ tableDataMaps = dataMapMappping.get(identifier);
+ }
+ dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+ if (dataMap == null) {
throw new RuntimeException("Datamap does not exist");
- } else {
- dataMap = map.get(dataMapName);
- if (dataMap == null) {
- throw new RuntimeException("Datamap does not exist");
- }
}
- // Initialize datamap
- dataMap.init(identifier, dataMapName);
return dataMap;
}
/**
- * Create new datamap instance using datamap type and path
+ * Create new datamap instance using datamap name, datamap type and table identifier
*
* @param mapType
* @return
*/
- public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataMapType mapType,
- String dataMapName) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- if (map == null) {
- map = new HashMap<>();
- dataMapMappping.put(mapType, map);
+ private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
+ DataMapType mapType, String dataMapName) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ if (tableDataMaps == null) {
+ tableDataMaps = new ArrayList<>();
+ dataMapMappping.put(identifier, tableDataMaps);
}
- TableDataMap dataMap = map.get(dataMapName);
+ TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
if (dataMap != null) {
throw new RuntimeException("Already datamap exists in that path with type " + mapType);
}
try {
- //TODO create datamap using @mapType.getClassName())
+ DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
+ dataMapFactory.init(identifier, dataMapName);
+ dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
} catch (Exception e) {
LOGGER.error(e);
+ throw new RuntimeException(e);
+ }
+ tableDataMaps.add(dataMap);
+ return dataMap;
+ }
+
+ private TableDataMap getAbstractTableDataMap(String dataMapName,
+ List<TableDataMap> tableDataMaps) {
+ TableDataMap dataMap = null;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap.getDataMapName().equals(dataMapName)) {
+ dataMap = tableDataMap;
+ break;
+ }
}
- // TODO: Initialize a data map by calling init method on the data map
- map.put(dataMapName, dataMap);
return dataMap;
}
- public void clearDataMap(String dataMapName, DataMapType mapType) {
- Map<String, TableDataMap> map = dataMapMappping.get(mapType);
- if (map != null && map.get(dataMapName) != null) {
- map.remove(dataMapName).clear();
+ /**
+ * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+ * @param identifier
+ * @param dataMapName
+ */
+ public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+ List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+ if (tableDataMaps != null) {
+ int i = 0;
+ for (TableDataMap tableDataMap: tableDataMaps) {
+ if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+ tableDataMap.clear(new ArrayList<String>());
+ tableDataMaps.remove(i);
+ break;
+ }
+ i++;
+ }
}
}
+ /**
+ * Returns the singleton instance
+ * @return
+ */
public static DataMapStoreManager getInstance() {
return instance;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
index b6a0f5b..0059b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
@@ -16,19 +16,21 @@
*/
package org.apache.carbondata.core.indexstore;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+
/**
* Datamap type
*/
public enum DataMapType {
- BLOCKLET("org.apache.carbondata.datamap.BlockletDataMap");
+ BLOCKLET(BlockletDataMapFactory.class);
- private String className;
+ private Class<? extends DataMapFactory> classObject;
- DataMapType(String className) {
- this.className = className;
+ DataMapType(Class<? extends DataMapFactory> classObject) {
+ this.classObject = classObject;
}
- public String getClassName() {
- return className;
+ public Class<? extends DataMapFactory> getClassObject() {
+ return classObject;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
new file mode 100644
index 0000000..7e2bc0e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ */
+public class TableBlockIndexUniqueIdentifier {
+ /**
+ * table fully qualified identifier
+ */
+ private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+ private String segmentId;
+
+ private String carbonIndexFileName;
+
+ /**
+ * Constructor to initialize the class instance
+ *
+ * @param absoluteTableIdentifier
+ * @param segmentId
+ */
+ public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+ String segmentId, String carbonIndexFileName) {
+ this.absoluteTableIdentifier = absoluteTableIdentifier;
+ this.segmentId = segmentId;
+ this.carbonIndexFileName = carbonIndexFileName;
+ }
+
+ /**
+ * returns AbsoluteTableIdentifier
+ *
+ * @return
+ */
+ public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+ return absoluteTableIdentifier;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ /**
+ * method returns the id to uniquely identify a key
+ *
+ * @return
+ */
+ public String getUniqueTableSegmentIdentifier() {
+ CarbonTableIdentifier carbonTableIdentifier =
+ absoluteTableIdentifier.getCarbonTableIdentifier();
+ return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+ + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
+ + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId
+ + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName;
+ }
+
+ public String getFilePath() {
+ return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/"
+ + carbonIndexFileName;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o;
+
+ if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) {
+ return false;
+ }
+ if (!segmentId.equals(that.segmentId)) {
+ return false;
+ }
+ return carbonIndexFileName.equals(that.carbonIndexFileName);
+ }
+
+ @Override public int hashCode() {
+ int result = absoluteTableIdentifier.hashCode();
+ result = 31 * result + segmentId.hashCode();
+ result = 31 * result + carbonIndexFileName.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
index e1532c8..39ca4c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
@@ -16,38 +16,34 @@
*/
package org.apache.carbondata.core.indexstore;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.events.ChangeEvent;
import org.apache.carbondata.core.events.EventListener;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
/**
* DataMap at the table level, user can add any number of datamaps for one table. Depends
* on the filter condition it can prune the blocklets.
*/
-public interface TableDataMap extends EventListener {
+public final class TableDataMap implements EventListener {
- /**
- * It is called to initialize and load the required table datamap metadata.
- */
- void init(AbsoluteTableIdentifier identifier, String dataMapName);
+ private AbsoluteTableIdentifier identifier;
- /**
- * Gives the writer to write the metadata information of this datamap at table level.
- *
- * @return
- */
- DataMapWriter getWriter();
+ private String dataMapName;
+
+ private DataMapFactory dataMapFactory;
/**
- * Create the datamap using the segmentid and name.
- *
- * @param identifier
- * @param segmentId
- * @return
+ * It is called to initialize and load the required table datamap metadata.
*/
- DataMap createDataMap(AbsoluteTableIdentifier identifier, String segmentId);
+ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+ DataMapFactory dataMapFactory) {
+ this.identifier = identifier;
+ this.dataMapName = dataMapName;
+ this.dataMapFactory = dataMapFactory;
+ }
/**
* Pass the valid segments and prune the datamap using filter expression
@@ -56,7 +52,24 @@ public interface TableDataMap extends EventListener {
* @param filterExp
* @return
*/
- List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp);
+ public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
+ List<Blocklet> blocklets = new ArrayList<>();
+ for (String segmentId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ for (DataMap dataMap : dataMaps) {
+ List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+ blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+ }
+ }
+ return blocklets;
+ }
+
+ private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+ for (Blocklet blocklet : pruneBlocklets) {
+ blocklet.setSegmentId(segmentId);
+ }
+ return pruneBlocklets;
+ }
/**
* This is used for making the datamap distributable.
@@ -65,7 +78,16 @@ public interface TableDataMap extends EventListener {
*
* @return
*/
- List<DataMapDistributable> toDistributable(List<String> segmentIds);
+ public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
+ List<DataMapDistributable> distributables = new ArrayList<>();
+ for (String segmentsId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+ for (DataMap dataMap : dataMaps) {
+ distributables.add(dataMap.toDistributable());
+ }
+ }
+ return distributables;
+ }
/**
* This method is used from any machine after it is distributed. It takes the distributable object
@@ -75,20 +97,37 @@ public interface TableDataMap extends EventListener {
* @param filterExp
* @return
*/
- List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp);
+ public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+ return dataMapFactory.getDataMap(distributable).prune(filterExp);
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+ dataMapFactory.fireEvent(event);
+ }
/**
- * This method checks whether the columns and the type of filters supported
- * for this datamap or not
- *
- * @param filterExp
- * @return
+ * Clear only the datamaps of the segments
+ * @param segmentIds
*/
- boolean isFiltersSupported(FilterResolverIntf filterExp);
+ public void clear(List<String> segmentIds) {
+ for (String segmentId: segmentIds) {
+ dataMapFactory.clear(segmentId);
+ }
+ }
/**
- * Clears table level datamap
+ * Clears all datamap
+ */
+ public void clear() {
+ dataMapFactory.clear();
+ }
+ /**
+ * Get the unique name of datamap
+ *
+ * @return
*/
- void clear();
+ public String getDataMapName() {
+ return dataMapName;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
new file mode 100644
index 0000000..8246f99
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryAllocator;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to unsafe.
+ */
+public class UnsafeMemoryDMStore {
+
+ private MemoryBlock memoryBlock;
+
+ private static int capacity = 8 * 1024 * 1024;
+
+ private int allocatedSize;
+
+ private int runningLength;
+
+ private MemoryAllocator memoryAllocator;
+
+ private boolean isMemoryFreed;
+
+ private DataMapSchema[] schema;
+
+ private int[] pointers;
+
+ private int rowCount;
+
+ public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+ this.schema = schema;
+ this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
+ this.allocatedSize = capacity;
+ this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+ this.pointers = new int[1000];
+ }
+
+ /**
+ * Check memory is sufficient or not, if not sufficient allocate more memory and copy old data to
+ * new one.
+ *
+ * @param rowSize
+ */
+ private void ensureSize(int rowSize) {
+ if (runningLength + rowSize >= allocatedSize) {
+ MemoryBlock allocate =
+ MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+ unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+ allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+ memoryAllocator.free(memoryBlock);
+ allocatedSize = allocatedSize + capacity;
+ memoryBlock = allocate;
+ }
+ if (this.pointers.length <= rowCount + 1) {
+ int[] newPointer = new int[pointers.length + 1000];
+ System.arraycopy(pointers, 0, newPointer, 0, pointers.length);
+ this.pointers = newPointer;
+ }
+ }
+
+ /**
+ * Add the index row to unsafe.
+ *
+ * @param indexRow
+ * @return
+ */
+ public void addIndexRowToUnsafe(DataMapRow indexRow) {
+ // First calculate the required memory to keep the row in unsafe
+ int rowSize = indexRow.getTotalSizeInBytes();
+ // Check whether allocated memory is sufficient or not.
+ ensureSize(rowSize);
+ int pointer = runningLength;
+
+ for (int i = 0; i < schema.length; i++) {
+ addToUnsafe(schema[i], indexRow, i);
+ }
+ pointers[rowCount++] = pointer;
+ }
+
+ private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
+ switch (schema.getSchemaType()) {
+ case FIXED:
+ switch (schema.getDataType()) {
+ case BYTE:
+ unsafe.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getByte(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case SHORT:
+ unsafe
+ .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getShort(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case INT:
+ unsafe.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getInt(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case LONG:
+ unsafe.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getLong(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case FLOAT:
+ unsafe
+ .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getFloat(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case DOUBLE:
+ unsafe
+ .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+ row.getDouble(index));
+ runningLength += row.getSizeInBytes(index);
+ break;
+ case BYTE_ARRAY:
+ byte[] data = row.getByteArray(index);
+ unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ memoryBlock.getBaseOffset() + runningLength, data.length);
+ runningLength += row.getSizeInBytes(index);
+ break;
+ }
+ break;
+ case VARIABLE:
+ byte[] data = row.getByteArray(index);
+ unsafe.putShort(memoryBlock.getBaseOffset() + runningLength, (short) data.length);
+ runningLength += 2;
+ unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ memoryBlock.getBaseOffset() + runningLength, data.length);
+ runningLength += data.length;
+ break;
+ case STRUCT:
+ DataMapSchema[] childSchemas =
+ ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas();
+ DataMapRow struct = row.getRow(index);
+ for (int i = 0; i < childSchemas.length; i++) {
+ addToUnsafe(childSchemas[i], struct, i);
+ }
+ break;
+ }
+ }
+
+ public DataMapRow getUnsafeRow(int index) {
+ assert (index < rowCount);
+ return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
+ }
+
+ public void finishWriting() {
+ if (runningLength < allocatedSize) {
+ MemoryBlock allocate =
+ MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+ unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+ allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+ memoryAllocator.free(memoryBlock);
+ memoryBlock = allocate;
+ }
+ // Compact pointers.
+ if (rowCount < pointers.length) {
+ int[] newPointer = new int[rowCount];
+ System.arraycopy(pointers, 0, newPointer, 0, rowCount);
+ this.pointers = newPointer;
+ }
+ }
+
+ public void freeMemory() {
+ if (!isMemoryFreed) {
+ memoryAllocator.free(memoryBlock);
+ isMemoryFreed = true;
+ }
+ }
+
+ public int getMemoryUsed() {
+ return runningLength;
+ }
+
+ public DataMapSchema[] getSchema() {
+ return schema;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
new file mode 100644
index 0000000..9a50600
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Data map comparator
+ */
+public class BlockletDMComparator implements Comparator<DataMapRow> {
+
+ /**
+ * no dictionary column value is of variable length so in each column value
+ * it will -1
+ */
+ private static final int NO_DCITIONARY_COLUMN_VALUE = -1;
+
+ /**
+ * sized of the short value in bytes
+ */
+ private static final short SHORT_SIZE_IN_BYTES = 2;
+
+ private int[] eachColumnValueSize;
+
+ /**
+ * the number of no dictionary columns in SORT_COLUMNS
+ */
+ private int numberOfNoDictSortColumns;
+
+ /**
+ * the number of columns in SORT_COLUMNS
+ */
+ private int numberOfSortColumns;
+
+ public BlockletDMComparator(int[] eachColumnValueSize, int numberOfSortColumns,
+ int numberOfNoDictSortColumns) {
+ this.eachColumnValueSize = eachColumnValueSize;
+ this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+ this.numberOfSortColumns = numberOfSortColumns;
+ }
+
+ @Override public int compare(DataMapRow first, DataMapRow second) {
+ int dictionaryKeyOffset = 0;
+ int nonDictionaryKeyOffset = 0;
+ int compareResult = 0;
+ int processedNoDictionaryColumn = numberOfNoDictSortColumns;
+ byte[][] firstBytes = splitKey(first.getByteArray(0));
+ byte[][] secondBytes = splitKey(first.getByteArray(0));
+ byte[] firstNoDictionaryKeys = firstBytes[1];
+ ByteBuffer firstNoDictionaryKeyBuffer = ByteBuffer.wrap(firstNoDictionaryKeys);
+ byte[] secondNoDictionaryKeys = secondBytes[1];
+ ByteBuffer secondNoDictionaryKeyBuffer = ByteBuffer.wrap(secondNoDictionaryKeys);
+ int actualOffset = 0;
+ int actualOffset1 = 0;
+ int firstNoDcitionaryLength = 0;
+ int secondNodeDictionaryLength = 0;
+
+ for (int i = 0; i < numberOfSortColumns; i++) {
+
+ if (eachColumnValueSize[i] != NO_DCITIONARY_COLUMN_VALUE) {
+ byte[] firstDictionaryKeys = firstBytes[0];
+ byte[] secondDictionaryKeys = secondBytes[0];
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i],
+ secondDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i]);
+ dictionaryKeyOffset += eachColumnValueSize[i];
+ } else {
+ if (processedNoDictionaryColumn > 1) {
+ actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ firstNoDcitionaryLength =
+ firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+ - actualOffset;
+ actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ secondNodeDictionaryLength =
+ secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+ - actualOffset1;
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+ secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+ nonDictionaryKeyOffset += SHORT_SIZE_IN_BYTES;
+ processedNoDictionaryColumn--;
+ } else {
+ actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+ firstNoDcitionaryLength = firstNoDictionaryKeys.length - actualOffset;
+ secondNodeDictionaryLength = secondNoDictionaryKeys.length - actualOffset1;
+ compareResult = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+ secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+ }
+ }
+ if (compareResult != 0) {
+ return compareResult;
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Split the index key to dictionary and no dictionary.
+ * @param startKey
+ * @return
+ */
+ private byte[][] splitKey(byte[] startKey) {
+ ByteBuffer buffer = ByteBuffer.wrap(startKey);
+ buffer.rewind();
+ int dictonaryKeySize = buffer.getInt();
+ int nonDictonaryKeySize = buffer.getInt();
+ byte[] dictionaryKey = new byte[dictonaryKeySize];
+ buffer.get(dictionaryKey);
+ byte[] nonDictionaryKey = new byte[nonDictonaryKeySize];
+ buffer.get(nonDictionaryKey);
+ return new byte[][] {dictionaryKey, nonDictionaryKey};
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
new file mode 100644
index 0000000..79aa091
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletDataMap implements DataMap, Cacheable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+
+ private static int KEY_INDEX = 0;
+
+ private static int MIN_VALUES_INDEX = 1;
+
+ private static int MAX_VALUES_INDEX = 2;
+
+ private static int ROW_COUNT_INDEX = 3;
+
+ private static int FILE_PATH_INDEX = 4;
+
+ private static int PAGE_COUNT_INDEX = 5;
+
+ private static int VERSION_INDEX = 6;
+
+ private static int SCHEMA_UPADATED_TIME_INDEX = 7;
+
+ private static int BLOCK_INFO_INDEX = 8;
+
+ private UnsafeMemoryDMStore unsafeMemoryDMStore;
+
+ private SegmentProperties segmentProperties;
+
+ private int[] columnCardinality;
+
+ @Override public DataMapWriter getWriter() {
+ return null;
+ }
+
+ @Override public void init(String path) {
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ try {
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+ for (DataFileFooter fileFooter : indexInfo) {
+ List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+ if (segmentProperties == null) {
+ columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+ segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+ createSchema(segmentProperties);
+ }
+ TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+ fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+ loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+ }
+ if (unsafeMemoryDMStore != null) {
+ unsafeMemoryDMStore.finishWriting();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
+ String filePath) {
+ int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+ List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+ DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
+ for (int index = 0; index < blockletList.size(); index++) {
+ DataMapRow row = new DataMapRowImpl(schema);
+ int ordinal = 0;
+ BlockletInfo blockletInfo = blockletList.get(index);
+
+ // add start key as index key
+ row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
+
+ BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
+ ordinal++;
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
+ ordinal++;
+
+ row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
+
+ // add file path
+ byte[] filePathBytes = filePath.getBytes();
+ row.setByteArray(filePathBytes, ordinal++);
+
+ // add pages
+ row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+
+ // add version number
+ row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+ // add schema updated time
+ row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+ // add blocklet info
+ byte[] serializedData;
+ try {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(stream);
+ blockletInfo.write(dataOutput);
+ serializedData = stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ row.setByteArray(serializedData, ordinal);
+ unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ }
+ }
+
+ private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
+ DataMapSchema[] minSchemas =
+ ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
+ DataMapRow minRow = new DataMapRowImpl(minSchemas);
+ int minOrdinal = 0;
+ // min value adding
+ for (int i = 0; i < minMaxLen.length; i++) {
+ minRow.setByteArray(minValues[i], minOrdinal++);
+ }
+ return minRow;
+ }
+
+ private void createSchema(SegmentProperties segmentProperties) {
+ List<DataMapSchema> indexSchemas = new ArrayList<>();
+
+ // Index key
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+ int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+ // do it 2 times, one for min and one for max.
+ for (int k = 0; k < 2; k++) {
+ DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
+ for (int i = 0; i < minMaxLen.length; i++) {
+ if (minMaxLen[i] <= 0) {
+ mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY);
+ } else {
+ mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]);
+ }
+ }
+ DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas);
+ indexSchemas.add(mapSchema);
+ }
+
+ // for number of rows.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT));
+
+ // for table block path
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+ // for number of pages.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+ // for version number.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+ // for schema updated time.
+ indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG));
+
+ //for blocklet info
+ indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+ unsafeMemoryDMStore =
+ new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
+ }
+
+ @Override public List<Blocklet> prune(FilterResolverIntf filterExp) {
+
+ // getting the start and end index key based on filter for hitting the
+ // selected block reference nodes based on filter resolver tree.
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("preparing the start and end key for finding"
+ + "start and end block as per filter resolver");
+ }
+ List<Blocklet> blocklets = new ArrayList<>();
+ Comparator<DataMapRow> comparator =
+ new BlockletDMComparator(segmentProperties.getEachDimColumnValueSize(),
+ segmentProperties.getNumberOfSortColumns(),
+ segmentProperties.getNumberOfNoDictSortColumns());
+ List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+ FilterUtil
+ .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys);
+ // reading the first value from list which has start key
+ IndexKey searchStartKey = listOfStartEndKeys.get(0);
+ // reading the last value from list which has end key
+ IndexKey searchEndKey = listOfStartEndKeys.get(1);
+ if (null == searchStartKey && null == searchEndKey) {
+ try {
+ // TODO need to handle for no dictionary dimensions
+ searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+ // TODO need to handle for no dictionary dimensions
+ searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+ } catch (KeyGenException e) {
+ return null;
+ }
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Successfully retrieved the start and end key" + "Dictionary Start Key: " + searchStartKey
+ .getDictionaryKeys() + "No Dictionary Start Key " + searchStartKey
+ .getNoDictionaryKeys() + "Dictionary End Key: " + searchEndKey.getDictionaryKeys()
+ + "No Dictionary End Key " + searchEndKey.getNoDictionaryKeys());
+ }
+ if (filterExp == null) {
+ int rowCount = unsafeMemoryDMStore.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i);
+ blocklets.add(createBlocklet(unsafeRow, i));
+ }
+ } else {
+ int startIndex = findStartIndex(convertToRow(searchStartKey), comparator);
+ int endIndex = findEndIndex(convertToRow(searchEndKey), comparator);
+ FilterExecuter filterExecuter =
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ while (startIndex <= endIndex) {
+ DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex);
+ BitSet bitSet = filterExecuter.isScanRequired(getMinMaxValue(unsafeRow, MAX_VALUES_INDEX),
+ getMinMaxValue(unsafeRow, MIN_VALUES_INDEX));
+ if (!bitSet.isEmpty()) {
+ blocklets.add(createBlocklet(unsafeRow, startIndex));
+ }
+ startIndex++;
+ }
+ }
+
+ return blocklets;
+ }
+
+ private byte[][] getMinMaxValue(DataMapRow row, int index) {
+ DataMapRow minMaxRow = row.getRow(index);
+ byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
+ for (int i = 0; i < minMax.length; i++) {
+ minMax[i] = minMaxRow.getByteArray(i);
+ }
+ return minMax;
+ }
+
+ private Blocklet createBlocklet(DataMapRow row, int blockletId) {
+ Blocklet blocklet =
+ new Blocklet(new String(row.getByteArray(FILE_PATH_INDEX)), blockletId + "");
+ BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+ detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
+ detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
+ detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+ detailInfo.setDimLens(columnCardinality);
+ detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
+ BlockletInfo blockletInfo = new BlockletInfo();
+ try {
+ byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+ DataInputStream inputStream = new DataInputStream(stream);
+ blockletInfo.readFields(inputStream);
+ inputStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ detailInfo.setBlockletInfo(blockletInfo);
+ blocklet.setDetailInfo(detailInfo);
+ return blocklet;
+ }
+
+ /**
+ * Binary search used to get the first tentative index row based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ // if key is matched then get the first entry
+ int currentPos = mid;
+ while (currentPos - 1 >= 0
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+ currentPos--;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ // get the leaf child
+ return childNodeIndex;
+ }
+
+ /**
+ * Binary search used to get the last tentative block based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ int currentPos = mid;
+ // if key is matched then get the first entry
+ while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+ currentPos++;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ return childNodeIndex;
+ }
+
+ private DataMapRow convertToRow(IndexKey key) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
+ buffer.putInt(key.getDictionaryKeys().length);
+ buffer.putInt(key.getNoDictionaryKeys().length);
+ buffer.put(key.getDictionaryKeys());
+ buffer.put(key.getNoDictionaryKeys());
+ DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+ dataMapRow.setByteArray(buffer.array(), 0);
+ return dataMapRow;
+ }
+
+ @Override public void clear() {
+ unsafeMemoryDMStore.freeMemory();
+ unsafeMemoryDMStore = null;
+ segmentProperties = null;
+ }
+
+ @Override public long getFileTimeStamp() {
+ return 0;
+ }
+
+ @Override public int getAccessCount() {
+ return 0;
+ }
+
+ @Override public long getMemorySize() {
+ return unsafeMemoryDMStore.getMemoryUsed();
+ }
+
+ @Override public DataMapDistributable toDistributable() {
+ // TODO
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
new file mode 100644
index 0000000..2fe6643
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapFactory;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Table map for blocklet
+ */
+public class BlockletDataMapFactory implements DataMapFactory {
+
+ private AbsoluteTableIdentifier identifier;
+
+ private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+
+ private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+
+ public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+ this.identifier = identifier;
+ cache = CacheProvider.getInstance()
+ .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
+ }
+
+ public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+ return null;
+ }
+
+ public List<DataMap> getDataMaps(String segmentId) {
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ segmentMap.get(segmentId);
+ if (tableBlockIndexUniqueIdentifiers == null) {
+ tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+ String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId;
+ FileFactory.FileType fileType = FileFactory.getFileType(path);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(".carbonindex");
+ }
+ });
+ for (int i = 0; i < listFiles.length; i++) {
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName()));
+ }
+ }
+
+ try {
+ return cache.getAll(tableBlockIndexUniqueIdentifiers);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override public boolean isFiltersSupported(FilterType filterType) {
+ return true;
+ }
+
+ public void clear(String segmentId) {
+ List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
+ if (blockIndexes != null) {
+ for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
+ DataMap dataMap = cache.getIfPresent(blockIndex);
+ dataMap.clear();
+ cache.invalidate(blockIndex);
+ }
+ }
+ }
+
+ @Override public void clear() {
+ for (String segmentId: segmentMap.keySet()) {
+ clear(segmentId);
+ }
+ }
+
+ @Override public DataMap getDataMap(DataMapDistributable distributable) {
+ return null;
+ }
+
+ @Override public void fireEvent(ChangeEvent event) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
new file mode 100644
index 0000000..5509c75
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+/**
+ * wrapper for blocklet data map data
+ */
+public class BlockletDataRefNodeWrapper implements DataRefNode {
+
+ private List<TableBlockInfo> blockInfos;
+
+ private int index;
+
+ private int[] dimensionLens;
+
+ private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
+
+ public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
+ int[] dimensionLens) {
+ this.blockInfos = blockInfos;
+ this.index = index;
+ this.dimensionLens = dimensionLens;
+ }
+
+ @Override public DataRefNode getNextDataRefNode() {
+ if (index + 1 < blockInfos.size()) {
+ new BlockletDataRefNodeWrapper(blockInfos, index + 1, dimensionLens);
+ }
+ return null;
+ }
+
+ @Override public int nodeSize() {
+ return blockInfos.get(index).getDetailInfo().getRowCount();
+ }
+
+ @Override public long nodeNumber() {
+ return index;
+ }
+
+ @Override public byte[][] getColumnsMaxValue() {
+ return null;
+ }
+
+ @Override public byte[][] getColumnsMinValue() {
+ return null;
+ }
+
+ @Override
+ public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+ return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes);
+ }
+
+ @Override
+ public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes)
+ throws IOException {
+ DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+ return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndexes);
+ }
+
+ @Override
+ public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+ return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes);
+ }
+
+ @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
+ throws IOException {
+ MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+ return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex);
+ }
+
+ private DimensionColumnChunkReader getDimensionColumnChunkReader() throws IOException {
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+ DimensionColumnChunkReader dimensionColumnChunkReader = CarbonDataReaderFactory.getInstance()
+ .getDimensionColumnChunkReader(version,
+ blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens,
+ blockInfos.get(index).getFilePath());
+ return dimensionColumnChunkReader;
+ }
+
+ private MeasureColumnChunkReader getMeasureColumnChunkReader() throws IOException {
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+ return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version,
+ blockInfos.get(index).getDetailInfo().getBlockletInfo(),
+ blockInfos.get(index).getFilePath());
+ }
+
+ @Override
+ public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) {
+ this.deleteDeltaDataCache = deleteDeltaDataCache;
+ }
+
+ @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+ return deleteDeltaDataCache;
+ }
+
+ @Override public int numberOfPages() {
+ return blockInfos.get(index).getDetailInfo().getPagesCount();
+ }
+
+ public int numberOfNodes() {
+ return blockInfos.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
new file mode 100644
index 0000000..b8cffc6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Wrapper of abstract index
+ * TODO it could be removed after refactor
+ */
+public class IndexWrapper extends AbstractIndex {
+
+ public IndexWrapper(List<TableBlockInfo> blockInfos) {
+ DataFileFooter fileFooter = null;
+ try {
+ fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
+ fileFooter.getSegmentInfo().getColumnCardinality());
+ dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0,
+ segmentProperties.getDimensionColumnsValueSize());
+ }
+
+ @Override public void buildIndex(List<DataFileFooter> footerList) {
+ }
+}
[4/7] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index e4d3ba5..8aa18d5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -17,14 +17,31 @@
package org.apache.carbondata.presto.impl;
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.*;
-import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.DataRefNodeFinder;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -52,18 +69,24 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CacheClient;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.thrift.TBase;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.TableNotFoundException;
@@ -392,8 +415,9 @@ public class CarbonTableReader {
TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
if (IUDTable) {
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
- updateStatusManager)) {
+ if (CarbonUtil
+ .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 277005b..da0d082 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -71,7 +72,7 @@ class CarbonIUDMergerRDD[K, V](
var blocksOfLastSegment: List[TableBlockInfo] = null
- CarbonInputFormat.setSegmentsToAccess(
+ CarbonTableInputFormat.setSegmentsToAccess(
job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
// get splits
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 5c0e77d..1a8183c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -43,8 +43,8 @@ import org.apache.carbondata.core.mutate.UpdateVO
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -293,7 +293,7 @@ class CarbonMergerRDD[K, V](
for (eachSeg <- carbonMergerMapping.validSegments) {
// map for keeping the relation of a task and its blocks.
- job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+ job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
if (updateStatusManager.getUpdateStatusDetails.length != 0) {
updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg)
@@ -315,7 +315,8 @@ class CarbonMergerRDD[K, V](
updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
)
((!updated) || ((updated) && (!CarbonUtil
- .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager))))
+ .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+ updateDetails, updateStatusManager))))
})
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 85c4bc4..c383779 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.hadoop._
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.spark.load.CarbonLoaderUtil
@@ -246,22 +247,23 @@ class CarbonScanRDD(
iterator.asInstanceOf[Iterator[InternalRow]]
}
- private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = {
- CarbonInputFormat.setTableInfo(conf, tableInfo)
+ private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
+ CarbonTableInputFormat.setTableInfo(conf, tableInfo)
createInputFormat(conf)
}
- private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
- CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
- CarbonInputFormat.setTableInfo(conf, getTableInfo)
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
+ CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
+ CarbonTableInputFormat.setTableInfo(conf, tableInfo)
createInputFormat(conf)
}
- private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = {
- val format = new CarbonInputFormat[Object]
- CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath))
- CarbonInputFormat.setFilterPredicates(conf, filterExpression)
- CarbonInputFormat.setColumnProjection(conf, columnProjection)
+ private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
+ val format = new CarbonTableInputFormat[Object]
+ CarbonTableInputFormat.setTablePath(conf,
+ identifier.appendWithLocalPrefix(identifier.getTablePath))
+ CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
format
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 2ca3b8c..4950227 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
/**
@@ -38,8 +38,8 @@ object QueryPlanUtil {
* createCarbonInputFormat from query model
*/
def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+ (CarbonTableInputFormat[Array[Object]], Job) = {
+ val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -47,8 +47,8 @@ object QueryPlanUtil {
}
def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
+ conf: Configuration) : CarbonTableInputFormat[V] = {
+ val carbonInputFormat = new CarbonTableInputFormat[V]()
val job: Job = new Job(conf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
carbonInputFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 71ef6a6..5d931b8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
@@ -90,16 +91,17 @@ private[sql] case class CarbonDatasourceHadoopRelation(
filters.flatMap { filter =>
CarbonFilters.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))
- .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+ .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
- CarbonInputFormat.setColumnProjection(conf, projection)
- CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+ CarbonTableInputFormat.setColumnProjection(conf, projection)
+ CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
new SerializableConfiguration(conf),
absIdentifier,
- classOf[CarbonInputFormat[Row]],
+ classOf[CarbonTableInputFormat[Row]],
classOf[Row]
)
}
@@ -119,7 +121,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
@transient sc: SparkContext,
conf: SerializableConfiguration,
identifier: AbsoluteTableIdentifier,
- inputFormatClass: Class[_ <: CarbonInputFormat[V]],
+ inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
valueClass: Class[V])
extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a292cde..c38f0e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -133,13 +133,6 @@ private[sql] case class ProjectForUpdateCommand(
override def run(sqlContext: SQLContext): Seq[Row] = {
-
- // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
- // .EXECUTION_ID_KEY, null)
- // DataFrame(sqlContext, plan).show(truncate = false)
- // return Seq.empty
-
-
val res = plan find {
case relation: LogicalRelation if (relation.relation
.isInstanceOf[CarbonDatasourceRelation]) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 70c7caf..e0a8b58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
/**
* All the utility functions for carbon plan creation
@@ -37,8 +37,8 @@ object QueryPlanUtil {
* createCarbonInputFormat from query model
*/
def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+ (CarbonTableInputFormat[Array[Object]], Job) = {
+ val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -46,8 +46,8 @@ object QueryPlanUtil {
}
def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
+ conf: Configuration) : CarbonTableInputFormat[V] = {
+ val carbonInputFormat = new CarbonTableInputFormat[V]()
val job: Job = new Job(conf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
carbonInputFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 1788ccb..08b8600 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -378,4 +379,35 @@ public class CarbonCompactionUtil {
}
return restructuredBlockExists;
}
+
+ /**
+ * This method will check for any restructured block in the blocks selected for compaction
+ *
+ * @param segmentMapping
+ * @param tableLastUpdatedTime
+ * @return
+ */
+ public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
+ long tableLastUpdatedTime) {
+ boolean restructuredBlockExists = false;
+ for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+ String segmentId = taskMap.getKey();
+ TaskBlockInfo taskBlockInfo = taskMap.getValue();
+ Collection<List<TableBlockInfo>> infoList = taskBlockInfo.getAllTableBlockInfoList();
+ for (List<TableBlockInfo> listMetadata : infoList) {
+ for (TableBlockInfo blockInfo : listMetadata) {
+ // if schema modified timestamp is greater than footer stored schema timestamp,
+ // it indicates it is a restructured block
+ if (tableLastUpdatedTime > blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp()) {
+ restructuredBlockExists = true;
+ break;
+ }
+ }
+ }
+ if (restructuredBlockExists) {
+ break;
+ }
+ }
+ return restructuredBlockExists;
+ }
}
[2/7] carbondata git commit: [CARBONDATA-1301] change command to
update schema and data separately
Posted by ja...@apache.org.
[CARBONDATA-1301] change command to update schema and data separately
This closes #1160
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2dbfab64
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2dbfab64
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2dbfab64
Branch: refs/heads/master
Commit: 2dbfab64103f139de9cfb7ad683c877e9802c562
Parents: 042a05a
Author: jackylk <ja...@huawei.com>
Authored: Wed Jul 12 00:40:44 2017 +0800
Committer: Raghunandan S <ca...@gmail.com>
Committed: Thu Jul 27 18:47:52 2017 +0800
----------------------------------------------------------------------
.../execution/command/carbonTableSchema.scala | 28 ++++++++++----------
.../spark/sql/hive/CarbonFileMetastore.scala | 3 +--
2 files changed, 15 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbfab64/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 1781477..f3baf58 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -233,10 +233,10 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
sparkSession.sql(
s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
+ |(${ fields.map(f => f.rawSchema).mkString(",") })
+ |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+ s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+ s""""$tablePath"$carbonSchemaString) """)
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -268,8 +268,8 @@ case class DeleteLoadsById(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadById(
loadids,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -293,8 +293,8 @@ case class DeleteLoadsByLoadDate(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -751,10 +751,10 @@ case class LoadTable(
}
GlobalDictionaryUtil.generateGlobalDictionary(
- sparkSession.sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- dictionaryDataFrame)
+ sparkSession.sqlContext,
+ carbonLoadModel,
+ relation.tableMeta.storePath,
+ dictionaryDataFrame)
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
@@ -847,8 +847,8 @@ case class ShowLoads(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.showSegments(
getDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbfab64/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 048681c..549841b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -430,8 +430,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
checkSchemasModifiedTimeAndReloadTables
- val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
- CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+
val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
tableIdentifier.table)
metadataToBeRemoved match {
[3/7] carbondata git commit: [CARBONDATA-1284]Implement hive based
schema storage in carbon
Posted by ja...@apache.org.
[CARBONDATA-1284]Implement hive based schema storage in carbon
This closes #1149
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/042a05a5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/042a05a5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/042a05a5
Branch: refs/heads/master
Commit: 042a05a58cec223086ad55ad48ca27e34c40d135
Parents: 92fe63c
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jul 8 17:14:04 2017 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Thu Jul 27 18:47:52 2017 +0800
----------------------------------------------------------------------
.../execution/command/carbonTableSchema.scala | 30 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 384 ++++++++++---------
2 files changed, 225 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 46b58c5..1781477 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -233,10 +233,10 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
sparkSession.sql(
s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath" $carbonSchemaString) """)
+ |(${ fields.map(f => f.rawSchema).mkString(",") })
+ |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+ s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+ s""""$tablePath"$carbonSchemaString) """)
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -268,8 +268,8 @@ case class DeleteLoadsById(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadById(
loadids,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -293,8 +293,8 @@ case class DeleteLoadsByLoadDate(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -750,11 +750,11 @@ case class LoadTable(
(dataFrame, dataFrame)
}
- GlobalDictionaryUtil.generateGlobalDictionary(
- sparkSession.sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- dictionaryDataFrame)
+ GlobalDictionaryUtil.generateGlobalDictionary(
+ sparkSession.sqlContext,
+ carbonLoadModel,
+ relation.tableMeta.storePath,
+ dictionaryDataFrame)
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
@@ -847,8 +847,8 @@ case class ShowLoads(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.showSegments(
getDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 2407054..048681c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,23 +22,22 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -63,7 +62,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
-class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
@transient
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -78,7 +77,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
System.nanoTime() + ""
}
- val metadata = MetaData(new ArrayBuffer[TableMeta]())
+ lazy val metadata = loadMetadata(storePath, nextQueryId)
/**
@@ -91,22 +90,9 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
override def createCarbonRelation(parameters: Map[String, String],
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation = {
- val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
- val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
- val tables = getTableFromMetadataCache(database, tableName)
- tables match {
- case Some(t) =>
- CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
- case None =>
- readCarbonSchema(absIdentifier) match {
- case Some(meta) =>
- CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
- case None =>
- throw new NoSuchTableException(database, tableName)
- }
- }
+ lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
+ Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
+ .asInstanceOf[CarbonRelation]
}
def lookupRelation(dbName: Option[String], tableName: String)
@@ -114,21 +100,20 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
}
- override def lookupRelation(tableIdentifier: TableIdentifier)
+ def lookupRelation(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): LogicalPlan = {
+ checkSchemasModifiedTimeAndReloadTables()
val database = tableIdentifier.database.getOrElse(
- sparkSession.catalog.currentDatabase)
- val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
- case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
- _) =>
- carbonDatasourceHadoopRelation.carbonRelation
- case LogicalRelation(
- carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
- carbonDatasourceHadoopRelation.carbonRelation
- case _ => throw new NoSuchTableException(database, tableIdentifier.table)
+ sparkSession.catalog.currentDatabase
+ )
+ val tables = getTableFromMetadata(database, tableIdentifier.table, true)
+ tables match {
+ case Some(t) =>
+ CarbonRelation(database, tableIdentifier.table,
+ CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
+ case None =>
+ throw new NoSuchTableException(database, tableIdentifier.table)
}
- relation
}
/**
@@ -138,7 +123,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* @param tableName
* @return
*/
- def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
+ def getTableFromMetadata(database: String,
+ tableName: String, readStore: Boolean = false): Option[TableMeta] = {
metadata.tablesMeta
.find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
@@ -150,48 +136,99 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
tableExists(TableIdentifier(table, databaseOp))(sparkSession)
}
- override def tableExists(tableIdentifier: TableIdentifier)
- (sparkSession: SparkSession): Boolean = {
- try {
- lookupRelation(tableIdentifier)(sparkSession)
- } catch {
- case e: Exception =>
- return false
+ def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+ checkSchemasModifiedTimeAndReloadTables()
+ val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val tables = metadata.tablesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ tables.nonEmpty
+ }
+
+ def loadMetadata(metadataPath: String, queryId: String): MetaData = {
+ val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+ val statistic = new QueryStatistic()
+ // creating zookeeper instance once.
+ // if zookeeper is configured as carbon lock type.
+ val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
+ if (null != zookeeperurl) {
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+ }
+ if (metadataPath == null) {
+ return null
+ }
+ // if no locktype is configured and store type is HDFS set HDFS lock as default
+ if (null == CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+ FileType.HDFS == FileFactory.getFileType(metadataPath)) {
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.LOCK_TYPE,
+ CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
+ )
+ LOGGER.info("Default lock type HDFSLOCK is configured")
}
- true
+ val fileType = FileFactory.getFileType(metadataPath)
+ val metaDataBuffer = new ArrayBuffer[TableMeta]
+ fillMetaData(metadataPath, fileType, metaDataBuffer)
+ updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
+ statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
+ System.currentTimeMillis())
+ recorder.recordStatisticsForDriver(statistic, queryId)
+ MetaData(metaDataBuffer)
}
- private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
- val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
- val tableName = identifier.getCarbonTableIdentifier.getTableName
- val storePath = identifier.getStorePath
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
- tableName.toLowerCase(), UUID.randomUUID().toString)
- val carbonTablePath =
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val fileType = FileFactory.getFileType(tableMetadataFile)
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableUniqueName = dbName + "_" + tableName
- val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
- val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
- identifier.getStorePath,
- identifier.getTablePath,
- carbonTable)
- metadata.tablesMeta += tableMeta
- Some(tableMeta)
- } else {
- None
+ private def fillMetaData(basePath: String, fileType: FileType,
+ metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
+ val databasePath = basePath // + "/schemas"
+ try {
+ if (FileFactory.isFileExist(databasePath, fileType)) {
+ val file = FileFactory.getCarbonFile(databasePath, fileType)
+ val databaseFolders = file.listFiles()
+
+ databaseFolders.foreach(databaseFolder => {
+ if (databaseFolder.isDirectory) {
+ val dbName = databaseFolder.getName
+ val tableFolders = databaseFolder.listFiles()
+
+ tableFolders.foreach(tableFolder => {
+ if (tableFolder.isDirectory) {
+ val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
+ tableFolder.getName, UUID.randomUUID().toString)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
+ carbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableName = tableFolder.getName
+ val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
+ val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
+ val schemaFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+ wrapperTableInfo.setStorePath(storePath)
+ wrapperTableInfo
+ .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
+ carbonTable)
+ }
+ }
+ })
+ }
+ })
+ } else {
+ // Create folders and files.
+ FileFactory.mkdirs(databasePath, fileType)
+ }
+ } catch {
+ case s: java.io.FileNotFoundException =>
+ s.printStackTrace()
+ // Create folders and files.
+ FileFactory.mkdirs(databasePath, fileType)
}
}
@@ -201,15 +238,15 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* @param newTableIdentifier
* @param thriftTableInfo
* @param schemaEvolutionEntry
- * @param tablePath
+ * @param carbonStorePath
* @param sparkSession
*/
def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
- tablePath: String) (sparkSession: SparkSession): String = {
- val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ carbonStorePath: String)
+ (sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
@@ -218,19 +255,11 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
.fromExternalToWrapperTableInfo(thriftTableInfo,
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
- absoluteTableIdentifier.getStorePath)
- val identifier =
- new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName,
- wrapperTableInfo.getFactTable.getTableId)
- val path = createSchemaThriftFile(wrapperTableInfo,
+ carbonStorePath)
+ createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- identifier)
- addTableCache(wrapperTableInfo,
- AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName))
- path
+ newTableIdentifier.getTableName)(sparkSession)
}
/**
@@ -238,27 +267,25 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param tablePath
+ * @param carbonStorePath
* @param sparkSession
*/
def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- tablePath: String)(sparkSession: SparkSession): String = {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ carbonStorePath: String)
+ (sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName,
- tableIdentifier.getStorePath)
+ carbonStorePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
- wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
- val path = createSchemaThriftFile(wrapperTableInfo,
+ createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- tableIdentifier.getCarbonTableIdentifier)
- addTableCache(wrapperTableInfo, tableIdentifier)
- path
+ carbonTableIdentifier.getDatabaseName,
+ carbonTableIdentifier.getTableName)(sparkSession)
}
@@ -269,38 +296,24 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* Load CarbonTable from wrapper tableInfo
*
*/
- def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
+ def createTableFromThrift(
+ tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+ dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
+ if (tableExists(tableName, Some(dbName))(sparkSession)) {
+ sys.error(s"Table [$tableName] already exists under Database [$dbName]")
+ }
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val dbName = tableInfo.getDatabaseName
- val tableName = tableInfo.getFactTable.getTableName
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- tableInfo.setStorePath(identifier.getStorePath)
- createSchemaThriftFile(tableInfo,
+ thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+ .add(schemaEvolutionEntry)
+ val carbonTablePath = createSchemaThriftFile(tableInfo,
thriftTableInfo,
- identifier.getCarbonTableIdentifier)
+ dbName,
+ tableName)(sparkSession)
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- }
-
- /**
- * Generates schema string from TableInfo
- */
- override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
- tablePath: String): String = {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
- val schemaMetadataPath =
- CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
- tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(tableIdentifier.getStorePath)
- val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
- schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
- tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
- removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- addTableCache(tableInfo, tableIdentifier)
- CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
+ (carbonTablePath, "")
}
/**
@@ -308,16 +321,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
*
* @param tableInfo
* @param thriftTableInfo
+ * @param dbName
+ * @param tableName
+ * @param sparkSession
* @return
*/
- private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
- thriftTableInfo: TableInfo,
- carbonTableIdentifier: CarbonTableIdentifier): String = {
- val carbonTablePath = CarbonStorePath.
- getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
+ private def createSchemaThriftFile(
+ tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ dbName: String, tableName: String)
+ (sparkSession: SparkSession): String = {
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+ tableInfo.getFactTable.getTableId)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ tableInfo.setStorePath(storePath)
val fileType = FileFactory.getFileType(schemaMetadataPath)
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -326,20 +346,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
- carbonTablePath.getPath
- }
-
- protected def addTableCache(tableInfo: table.TableInfo,
- absoluteTableIdentifier: AbsoluteTableIdentifier) = {
- val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
- CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
- removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
+ removeTableFromMetadata(dbName, tableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
- absoluteTableIdentifier.getTablePath,
- CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
+ val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+ CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
metadata.tablesMeta += tableMeta
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+ carbonTablePath.getPath
}
/**
@@ -349,15 +362,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* @param tableName
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
metadataToBeRemoved match {
case Some(tableMeta) =>
metadata.tablesMeta -= tableMeta
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
case None =>
- if (LOGGER.isDebugEnabled) {
- LOGGER.debug(s"No entry for table $tableName in database $dbName")
- }
+ LOGGER.debug(s"No entry for table $tableName in database $dbName")
}
}
@@ -391,23 +402,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- try {
- val tablePath = lookupRelation(tableIdentifier)(sparkSession).
- asInstanceOf[CarbonRelation].tableMeta.tablePath
- val fileType = FileFactory.getFileType(tablePath)
- FileFactory.isFileExist(tablePath, fileType)
- } catch {
- case e: Exception =>
- false
- }
+ val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val tableName = tableIdentifier.table.toLowerCase
+
+ val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+ new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+ val fileType = FileFactory.getFileType(tablePath)
+ FileFactory.isFileExist(tablePath, fileType)
}
- def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession) {
val dbName = tableIdentifier.database.get
val tableName = tableIdentifier.table
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
+
+ val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+ new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
@@ -418,18 +429,27 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
-
- removeTableFromMetadata(dbName, tableName)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
+ checkSchemasModifiedTimeAndReloadTables
+ val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+ CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+ tableIdentifier.table)
+ metadataToBeRemoved match {
+ case Some(tableMeta) =>
+ metadata.tablesMeta -= tableMeta
+ CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+ case None =>
+ LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
+ }
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
}
}
- private def getTimestampFileAndType(basePath: String) = {
- val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+ private def getTimestampFileAndType(databaseName: String, tableName: String) = {
+ val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
val timestampFileType = FileFactory.getFileType(timestampFile)
(timestampFile, timestampFileType)
}
@@ -443,20 +463,37 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
tableModifiedTimeStore.put("default", timeStamp)
}
- def updateAndTouchSchemasUpdatedTime(basePath: String) {
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
+ def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
}
+ /**
+ * This method will read the timestamp of empty schema file
+ *
+ * @param databaseName
+ * @param tableName
+ * @return
+ */
+ private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+ FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+ } else {
+ System.currentTimeMillis()
+ }
+ }
/**
* This method will check and create an empty schema timestamp file
*
+ * @param databaseName
+ * @param tableName
* @return
*/
- private def touchSchemaFileSystemTime(basePath: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
+ private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $basePath")
+ LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
FileFactory.createNewFile(timestampFile, timestampFileType)
}
val systemTime = System.currentTimeMillis()
@@ -465,9 +502,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
systemTime
}
- def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
- val (timestampFile, timestampFileType) =
- getTimestampFileAndType(storePath)
+ def checkSchemasModifiedTimeAndReloadTables() {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
@@ -478,7 +514,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
}
private def refreshCache() {
- metadata.tablesMeta.clear()
+ metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
}
override def isReadFromHiveMetaStore: Boolean = false
[5/7] carbondata git commit: [CARBONDATA-1232] Datamap implementation
for Blocklet
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
new file mode 100644
index 0000000..defe766
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+
+/**
+ * It is just a normal row to store data. Implementation classes could be safe and unsafe.
+ * TODO move this class a global row and use across loading after DataType is changed class
+ */
+public abstract class DataMapRow {
+
+ protected DataMapSchema[] schemas;
+
+ public DataMapRow(DataMapSchema[] schemas) {
+ this.schemas = schemas;
+ }
+
+ public abstract byte[] getByteArray(int ordinal);
+
+ public abstract DataMapRow getRow(int ordinal);
+
+ public abstract void setRow(DataMapRow row, int ordinal);
+
+ public abstract void setByteArray(byte[] byteArray, int ordinal);
+
+ public abstract int getInt(int ordinal);
+
+ public abstract void setInt(int value, int ordinal);
+
+ public abstract void setByte(byte value, int ordinal);
+
+ public abstract byte getByte(int ordinal);
+
+ public abstract void setShort(short value, int ordinal);
+
+ public abstract short getShort(int ordinal);
+
+ public abstract void setLong(long value, int ordinal);
+
+ public abstract long getLong(int ordinal);
+
+ public abstract void setFloat(float value, int ordinal);
+
+ public abstract float getFloat(int ordinal);
+
+ public abstract void setDouble(double value, int ordinal);
+
+ public abstract double getDouble(int ordinal);
+
+ public int getTotalSizeInBytes() {
+ int len = 0;
+ for (int i = 0; i < schemas.length; i++) {
+ len += getSizeInBytes(i);
+ }
+ return len;
+ }
+
+ public int getSizeInBytes(int ordinal) {
+ switch (schemas[ordinal].getSchemaType()) {
+ case FIXED:
+ return schemas[ordinal].getLength();
+ case VARIABLE:
+ return getByteArray(ordinal).length + 2;
+ case STRUCT:
+ return getRow(ordinal).getTotalSizeInBytes();
+ default:
+ throw new UnsupportedOperationException("wrong type");
+ }
+ }
+
+ public int getColumnCount() {
+ return schemas.length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
new file mode 100644
index 0000000..adec346
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Data map row.
+ */
+public class DataMapRowImpl extends DataMapRow {
+
+ private Object[] data;
+
+ public DataMapRowImpl(DataMapSchema[] schemas) {
+ super(schemas);
+ this.data = new Object[schemas.length];
+ }
+
+ @Override public byte[] getByteArray(int ordinal) {
+ return (byte[]) data[ordinal];
+ }
+
+ @Override public DataMapRow getRow(int ordinal) {
+ return (DataMapRow) data[ordinal];
+ }
+
+ @Override public void setByteArray(byte[] byteArray, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.BYTE_ARRAY);
+ data[ordinal] = byteArray;
+ }
+
+ @Override public int getInt(int ordinal) {
+ return (Integer) data[ordinal];
+ }
+
+ @Override public void setInt(int value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.INT);
+ data[ordinal] = value;
+ }
+
+ @Override public void setByte(byte value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.BYTE);
+ data[ordinal] = value;
+ }
+
+ @Override public byte getByte(int ordinal) {
+ return (Byte) data[ordinal];
+ }
+
+ @Override public void setShort(short value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.SHORT);
+ data[ordinal] = value;
+ }
+
+ @Override public short getShort(int ordinal) {
+ return (Short) data[ordinal];
+ }
+
+ @Override public void setLong(long value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.LONG);
+ data[ordinal] = value;
+ }
+
+ @Override public long getLong(int ordinal) {
+ return (Long) data[ordinal];
+ }
+
+ @Override public void setFloat(float value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.FLOAT);
+ data[ordinal] = value;
+ }
+
+ @Override public float getFloat(int ordinal) {
+ return (Float) data[ordinal];
+ }
+
+ @Override public void setDouble(double value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.DOUBLE);
+ data[ordinal] = value;
+ }
+
+ @Override public void setRow(DataMapRow row, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.STRUCT);
+ data[ordinal] = row;
+ }
+
+ @Override public double getDouble(int ordinal) {
+ return (Double) data[ordinal];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
new file mode 100644
index 0000000..ef78514
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Unsafe implementation of data map row.
+ */
+public class UnsafeDataMapRow extends DataMapRow {
+
+ private MemoryBlock block;
+
+ private int pointer;
+
+ public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) {
+ super(schemas);
+ this.block = block;
+ this.pointer = pointer;
+ }
+
+ @Override public byte[] getByteArray(int ordinal) {
+ int length;
+ int position = getPosition(ordinal);
+ switch (schemas[ordinal].getSchemaType()) {
+ case VARIABLE:
+ length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+ position += 2;
+ break;
+ default:
+ length = schemas[ordinal].getLength();
+ }
+ byte[] data = new byte[length];
+ unsafe.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data,
+ BYTE_ARRAY_OFFSET, data.length);
+ return data;
+ }
+
+ @Override public DataMapRow getRow(int ordinal) {
+ DataMapSchema[] childSchemas =
+ ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
+ return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
+ }
+
+ @Override public void setByteArray(byte[] byteArray, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public int getInt(int ordinal) {
+ return unsafe
+ .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setInt(int value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public void setByte(byte value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public byte getByte(int ordinal) {
+ return unsafe
+ .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setShort(short value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public short getShort(int ordinal) {
+ return unsafe
+ .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setLong(long value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public long getLong(int ordinal) {
+ return unsafe
+ .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setFloat(float value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public float getFloat(int ordinal) {
+ return unsafe
+ .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setDouble(double value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public double getDouble(int ordinal) {
+ return unsafe
+ .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setRow(DataMapRow row, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ private int getPosition(int ordinal) {
+ int position = 0;
+ for (int i = 0; i < ordinal; i++) {
+ position += getSizeInBytes(i);
+ }
+ return position;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
new file mode 100644
index 0000000..80c68ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It just have 2 types right now, either fixed or variable.
+ */
+public abstract class DataMapSchema {
+
+ protected DataType dataType;
+
+ public DataMapSchema(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ /**
+ * Either fixed or variable length.
+ *
+ * @return
+ */
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * Gives length in case of fixed schema other wise returns length
+ *
+ * @return
+ */
+ public abstract int getLength();
+
+ /**
+ * schema type
+ * @return
+ */
+ public abstract DataMapSchemaType getSchemaType();
+
+ /*
+ * It has always fixed length, length cannot be updated later.
+ * Usage examples : all primitive types like short, int etc
+ */
+ public static class FixedDataMapSchema extends DataMapSchema {
+
+ private int length;
+
+ public FixedDataMapSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ public FixedDataMapSchema(DataType dataType, int length) {
+ super(dataType);
+ this.length = length;
+ }
+
+ @Override public int getLength() {
+ if (length == 0) {
+ return dataType.getSizeInBytes();
+ } else {
+ return length;
+ }
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.FIXED;
+ }
+ }
+
+ public static class VariableDataMapSchema extends DataMapSchema {
+
+ public VariableDataMapSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.VARIABLE;
+ }
+ }
+
+ public static class StructDataMapSchema extends DataMapSchema {
+
+ private DataMapSchema[] childSchemas;
+
+ public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) {
+ super(dataType);
+ this.childSchemas = childSchemas;
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ public DataMapSchema[] getChildSchemas() {
+ return childSchemas;
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.STRUCT;
+ }
+ }
+
+ public enum DataMapSchemaType {
+ FIXED, VARIABLE, STRUCT
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
new file mode 100644
index 0000000..9d77010
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+/**
+ * Types of filters of select query
+ */
+public enum FilterType {
+ EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index bfa9d7e..f81f805 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -17,16 +17,22 @@
package org.apache.carbondata.core.metadata.blocklet;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.hadoop.io.Writable;
+
/**
* class to store the information about the blocklet
*/
-public class BlockletInfo implements Serializable {
+public class BlockletInfo implements Serializable, Writable {
/**
* serialization id
@@ -189,4 +195,49 @@ public class BlockletInfo implements Serializable {
this.numberOfPages = numberOfPages;
}
+ @Override public void write(DataOutput output) throws IOException {
+ output.writeLong(dimensionOffset);
+ output.writeLong(measureOffsets);
+ int dsize = dimensionChunkOffsets != null ? dimensionChunkOffsets.size() : 0;
+ output.writeShort(dsize);
+ for (int i = 0; i < dsize; i++) {
+ output.writeLong(dimensionChunkOffsets.get(i));
+ }
+ for (int i = 0; i < dsize; i++) {
+ output.writeInt(dimensionChunksLength.get(i));
+ }
+ int mSize = measureChunkOffsets != null ? measureChunkOffsets.size() : 0;
+ output.writeShort(mSize);
+ for (int i = 0; i < mSize; i++) {
+ output.writeLong(measureChunkOffsets.get(i));
+ }
+ for (int i = 0; i < mSize; i++) {
+ output.writeInt(measureChunksLength.get(i));
+ }
+ }
+
+ @Override public void readFields(DataInput input) throws IOException {
+ dimensionOffset = input.readLong();
+ measureOffsets = input.readLong();
+ short dimensionChunkOffsetsSize = input.readShort();
+ dimensionChunkOffsets = new ArrayList<>(dimensionChunkOffsetsSize);
+ for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+ dimensionChunkOffsets.add(input.readLong());
+ }
+ dimensionChunksLength = new ArrayList<>(dimensionChunkOffsetsSize);
+ for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+ dimensionChunksLength.add(input.readInt());
+ }
+
+ short measureChunkOffsetsSize = input.readShort();
+ measureChunkOffsets = new ArrayList<>(measureChunkOffsetsSize);
+ for (int i = 0; i < measureChunkOffsetsSize; i++) {
+ measureChunkOffsets.add(input.readLong());
+ }
+ measureChunksLength = new ArrayList<>(measureChunkOffsetsSize);
+ for (int i = 0; i < measureChunkOffsetsSize; i++) {
+ measureChunksLength.add(input.readInt());
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
index cd86a07..ae99ed8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.metadata.index;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
/**
@@ -45,6 +46,11 @@ public class BlockIndexInfo {
private BlockletIndex blockletIndex;
/**
+ * to store blocklet info like offsets and lengths of each column.
+ */
+ private BlockletInfo blockletInfo;
+
+ /**
* Constructor
*
* @param numberOfRows number of rows
@@ -61,6 +67,20 @@ public class BlockIndexInfo {
}
/**
+ *
+ * @param numberOfRows
+ * @param fileName
+ * @param offset
+ * @param blockletIndex
+ * @param blockletInfo
+ */
+ public BlockIndexInfo(long numberOfRows, String fileName, long offset,
+ BlockletIndex blockletIndex, BlockletInfo blockletInfo) {
+ this(numberOfRows, fileName, offset, blockletIndex);
+ this.blockletInfo = blockletInfo;
+ }
+
+ /**
* @return the numberOfRows
*/
public long getNumberOfRows() {
@@ -87,4 +107,11 @@ public class BlockIndexInfo {
public BlockletIndex getBlockletIndex() {
return blockletIndex;
}
+
+ /**
+ * @return BlockletInfo
+ */
+ public BlockletInfo getBlockletInfo() {
+ return blockletInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index faa4564..aa11202 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -41,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
@@ -118,23 +121,40 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// so block will be loaded in sorted order this will be required for
// query execution
Collections.sort(queryModel.getTableBlockInfos());
- // get the table blocks
- CacheProvider cacheProvider = CacheProvider.getInstance();
- BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
- (BlockIndexStore) cacheProvider
- .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
- // remove the invalid table blocks, block which is deleted or compacted
- cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
- queryModel.getAbsoluteTableIdentifier());
- List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
- prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
- queryModel.getAbsoluteTableIdentifier());
- cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
- queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
- queryStatistic
- .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
- queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ if (queryModel.getTableBlockInfos().get(0).getDetailInfo() != null) {
+ List<AbstractIndex> indexList = new ArrayList<>();
+ Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
+ for (TableBlockInfo blockInfo: queryModel.getTableBlockInfos()) {
+ List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
+ if (tableBlockInfos == null) {
+ tableBlockInfos = new ArrayList<>();
+ listMap.put(blockInfo.getFilePath(), tableBlockInfos);
+ }
+ tableBlockInfos.add(blockInfo);
+ }
+ for (List<TableBlockInfo> tableBlockInfos: listMap.values()) {
+ indexList.add(new IndexWrapper(tableBlockInfos));
+ }
+ queryProperties.dataBlocks = indexList;
+ } else {
+ // get the table blocks
+ CacheProvider cacheProvider = CacheProvider.getInstance();
+ BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
+ (BlockIndexStore) cacheProvider
+ .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
+ // remove the invalid table blocks, block which is deleted or compacted
+ cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
+ queryModel.getAbsoluteTableIdentifier());
+ List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
+ prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
+ queryModel.getAbsoluteTableIdentifier());
+ cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
+ queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
+ queryStatistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
+ queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ }
// calculating the total number of aggeragted columns
int aggTypeCount = queryModel.getQueryMeasures().size();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 8704496..a874835 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -156,7 +156,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
- boolean isScanRequired =
+ boolean isScanRequired = blockIndex >= blkMaxVal.length ||
isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index 6823531..c2e077e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -287,7 +287,7 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl {
BitSet bitSet = new BitSet(1);
byte[][] filterValues = this.filterRangesValues;
int columnIndex = this.dimColEvaluatorInfo.getColumnIndex();
- boolean isScanRequired =
+ boolean isScanRequired = columnIndex >= blockMinValue.length ||
isScanRequired(blockMinValue[columnIndex], blockMaxValue[columnIndex], filterValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index be82be7..73352cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -79,7 +79,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 53da6c5..6e8e188 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index d694960..d6f7c86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index b3dd921..597ba52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -82,7 +82,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index fdb5483..ff4f5dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -165,6 +165,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+ if (blocksChunkHolder.getDataBlock().getColumnsMaxValue() == null) {
+ return blocksChunkHolder;
+ }
if (blockletScanner.isScanRequired(blocksChunkHolder)) {
return blocksChunkHolder;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 92e9594..95030d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -127,20 +128,27 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
// set the deleted row to block execution info
blockInfo.setDeletedRecordsMap(deletedRowsMap);
}
- DataRefNode startDataBlock = finder
- .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
- while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
- startDataBlock = startDataBlock.getNextDataRefNode();
- }
- long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
- //if number of block is less than 0 then take end block.
- if (numberOfBlockToScan <= 0) {
- DataRefNode endDataBlock = finder
- .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
- numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode();
+ if (dataRefNode instanceof BlockletDataRefNodeWrapper) {
+ BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode;
+ blockInfo.setFirstDataBlock(wrapper);
+ blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes());
+
+ } else {
+ DataRefNode startDataBlock =
+ finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey());
+ while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
+ startDataBlock = startDataBlock.getNextDataRefNode();
+ }
+ long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
+ //if number of block is less than 0 then take end block.
+ if (numberOfBlockToScan <= 0) {
+ DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey());
+ numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ }
+ blockInfo.setFirstDataBlock(startDataBlock);
+ blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
}
- blockInfo.setFirstDataBlock(startDataBlock);
- blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 97b1a1f..34c7709 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -122,6 +122,57 @@ public abstract class AbstractDataFileFooterConverter {
}
/**
+ * Below method will be used to get the index info from index file
+ *
+ * @param filePath file path of the index file
+ * @return list of index info
+ * @throws IOException problem while reading the index file
+ */
+ public List<DataFileFooter> getIndexInfo(String filePath) throws IOException {
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+ String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
+ try {
+ // open the reader
+ indexReader.openThriftReader(filePath);
+ // get the index header
+ org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns =
+ readIndexHeader.getTable_columns();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ // get the segment info
+ SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+ BlockletIndex blockletIndex = null;
+ DataFileFooter dataFileFooter = null;
+ // read the block info from file
+ while (indexReader.hasNext()) {
+ BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+ blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+ dataFileFooter = new DataFileFooter();
+ TableBlockInfo tableBlockInfo = new TableBlockInfo();
+ tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
+ tableBlockInfo.setVersion(
+ ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
+ int blockletSize = getBlockletSize(readBlockIndexInfo);
+ tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
+ tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name);
+ dataFileFooter.setBlockletIndex(blockletIndex);
+ dataFileFooter.setColumnInTable(columnSchemaList);
+ dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+ dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
+ dataFileFooter.setSegmentInfo(segmentInfo);
+ dataFileFooters.add(dataFileFooter);
+ }
+ } finally {
+ indexReader.closeThriftReader();
+ }
+ return dataFileFooters;
+ }
+
+ /**
* the methods returns the number of blocklets in a block
*
* @param readBlockIndexInfo
@@ -148,6 +199,8 @@ public abstract class AbstractDataFileFooterConverter {
public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
throws IOException;
+ public abstract List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException;
+
/**
* Below method will be used to get blocklet index for data file meta
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ac45da1..6dd211a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -54,10 +54,13 @@ import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -914,10 +917,26 @@ public final class CarbonUtil {
* Below method will be used to read the data file matadata
*/
public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws IOException {
- AbstractDataFileFooterConverter fileFooterConverter =
- DataFileFooterConverterFactory.getInstance()
- .getDataFileFooterConverter(tableBlockInfo.getVersion());
- return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
+ if (detailInfo == null) {
+ AbstractDataFileFooterConverter fileFooterConverter =
+ DataFileFooterConverterFactory.getInstance()
+ .getDataFileFooterConverter(tableBlockInfo.getVersion());
+ return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ } else {
+ DataFileFooter fileFooter = new DataFileFooter();
+ fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
+ AbstractDataFileFooterConverter dataFileFooterConverter =
+ DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
+ fileFooter.setColumnInTable(dataFileFooterConverter.getSchema(tableBlockInfo));
+ SegmentInfo segmentInfo = new SegmentInfo();
+ segmentInfo.setColumnCardinality(detailInfo.getDimLens());
+ segmentInfo.setNumberOfColumns(detailInfo.getRowCount());
+ fileFooter.setSegmentInfo(segmentInfo);
+ return fileFooter;
+ }
}
/**
@@ -1555,24 +1574,23 @@ public final class CarbonUtil {
}
/**
- * @param tableInfo
* @param invalidBlockVOForSegmentId
* @param updateStatusMngr
* @return
*/
- public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
+ public static boolean isInvalidTableBlock(String segmentId, String filePath,
UpdateVO invalidBlockVOForSegmentId, SegmentUpdateStatusManager updateStatusMngr) {
- if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(),
- CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + CarbonTablePath
+ if (!updateStatusMngr.isBlockValid(segmentId,
+ CarbonTablePath.getCarbonDataFileName(filePath) + CarbonTablePath
.getCarbonDataExtension())) {
return true;
}
if (null != invalidBlockVOForSegmentId) {
- Long blockTimeStamp = Long.parseLong(tableInfo.getFilePath()
- .substring(tableInfo.getFilePath().lastIndexOf('-') + 1,
- tableInfo.getFilePath().lastIndexOf('.')));
+ Long blockTimeStamp = Long.parseLong(filePath
+ .substring(filePath.lastIndexOf('-') + 1,
+ filePath.lastIndexOf('.')));
if ((blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp() && (
invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp() != null
&& blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 0f82b95..3ac6987 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -121,4 +121,8 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
return blockletInfo;
}
+
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 4882b0f..8cd437f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -140,4 +140,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
return numberOfDimensionColumns;
}
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 143c1b1..ccb8b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -85,6 +85,17 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
return dataFileFooter;
}
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath());
+ FileHeader fileHeader = carbonHeaderReader.readHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ return columnSchemaList;
+ }
+
/**
* Below method is to convert the blocklet info of the thrift to wrapper
* blocklet info
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index c055031..4df085a 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -41,4 +41,5 @@ struct BlockIndex{
2: required string file_name; // Block file name
3: required i64 offset; // Offset of the footer
4: required carbondata.BlockletIndex block_index; // Blocklet index
+ 5: optional carbondata.BlockletInfo3 blocklet_info;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index dd91446..4bebf21 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -21,7 +21,14 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.DataRefNode;
@@ -390,8 +397,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
if (isIUDTable) {
// In case IUD is not performed in this table avoid searching for
// invalidated blocks.
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
- updateStatusManager)) {
+ if (CarbonUtil
+ .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 1cdbb26..e263aed 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.BlockletInfos;
import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.util.ByteUtil;
@@ -78,6 +79,8 @@ public class CarbonInputSplit extends FileSplit
*/
private String[] deleteDeltaFiles;
+ private BlockletDetailInfo detailInfo;
+
public CarbonInputSplit() {
segmentId = null;
taskId = "0";
@@ -139,10 +142,12 @@ public class CarbonInputSplit extends FileSplit
BlockletInfos blockletInfos =
new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
try {
- tableBlockInfoList.add(
+ TableBlockInfo blockInfo =
new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
- split.getDeleteDeltaFiles()));
+ split.getDeleteDeltaFiles());
+ blockInfo.setDetailInfo(split.getDetailInfo());
+ tableBlockInfoList.add(blockInfo);
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + split, e);
}
@@ -154,9 +159,12 @@ public class CarbonInputSplit extends FileSplit
BlockletInfos blockletInfos =
new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
try {
- return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
- inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
- blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+ TableBlockInfo blockInfo =
+ new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
+ inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
+ blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+ blockInfo.setDetailInfo(inputSplit.getDetailInfo());
+ return blockInfo;
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + inputSplit, e);
}
@@ -181,6 +189,11 @@ public class CarbonInputSplit extends FileSplit
for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
deleteDeltaFiles[i] = in.readUTF();
}
+ boolean detailInfoExists = in.readBoolean();
+ if (detailInfoExists) {
+ detailInfo = new BlockletDetailInfo();
+ detailInfo.readFields(in);
+ }
}
@Override public void write(DataOutput out) throws IOException {
@@ -198,6 +211,10 @@ public class CarbonInputSplit extends FileSplit
out.writeUTF(deleteDeltaFiles[i]);
}
}
+ out.writeBoolean(detailInfo != null);
+ if (detailInfo != null) {
+ detailInfo.write(out);
+ }
}
public List<String> getInvalidSegments() {
@@ -313,4 +330,16 @@ public class CarbonInputSplit extends FileSplit
public String[] getDeleteDeltaFiles() {
return deleteDeltaFiles;
}
+
+ public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
+ this.deleteDeltaFiles = deleteDeltaFiles;
+ }
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ae9c676..e73c04a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -18,152 +18,556 @@
package org.apache.carbondata.hadoop.api;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.DataMapStoreManager;
+import org.apache.carbondata.core.indexstore.DataMapType;
+import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.internal.CarbonInputSplit;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManager;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManagerFactory;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.util.StringUtils;
/**
* Input format of CarbonData file.
+ *
* @param <T>
*/
public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
+ // comma separated list of input segment numbers
+ public static final String INPUT_SEGMENT_NUMBERS =
+ "mapreduce.input.carboninputformat.segmentnumbers";
+ // comma separated list of input files
+ public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+ private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
+ private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+ private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+ private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
- private SegmentManager segmentManager;
+ /**
+ * It is optional, if user does not set then it reads from store
+ *
+ * @param configuration
+ * @param carbonTable
+ * @throws IOException
+ */
+ public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+ throws IOException {
+ if (null != carbonTable) {
+ configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+ }
+ }
- public CarbonTableInputFormat() {
- this.segmentManager = SegmentManagerFactory.getGlobalSegmentManager();
+ public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
+ String carbonTableStr = configuration.get(CARBON_TABLE);
+ if (carbonTableStr == null) {
+ populateCarbonTable(configuration);
+ // read it from schema file in the store
+ carbonTableStr = configuration.get(CARBON_TABLE);
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ }
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
}
- @Override
- public RecordReader<Void, T> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- switch (((CarbonInputSplit)split).formatType()) {
- case COLUMNAR:
- // TODO: create record reader for columnar format
- break;
- default:
- throw new RuntimeException("Unsupported format type");
+ /**
+ * this method will read the schema from the physical file and populate into CARBON_TABLE
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ private static void populateCarbonTable(Configuration configuration) throws IOException {
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
}
- return null;
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+ // read the schema file to get the absoluteTableIdentifier having the correct table id
+ // persisted in the schema
+ CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+ setCarbonTable(configuration, carbonTable);
}
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
+ public static void setTablePath(Configuration configuration, String tablePath)
+ throws IOException {
+ configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+ }
- // work as following steps:
- // get all current valid segment
- // for each segment, get all input split
+ /**
+ * It sets unresolved filter expression.
+ *
+ * @param configuration
+ * @param filterExpression
+ */
+ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+ if (filterExpression == null) {
+ return;
+ }
+ try {
+ String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+ configuration.set(FILTER_PREDICATE, filterString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting filter expression to Job", e);
+ }
+ }
- List<InputSplit> output = new LinkedList<>();
- Expression filter = getFilter(job.getConfiguration());
- Segment[] segments = segmentManager.getAllValidSegments();
- FilterResolverIntf filterResolver = CarbonInputFormatUtil.resolveFilter(filter, null);
- for (Segment segment: segments) {
- List<InputSplit> splits = segment.getSplits(job, filterResolver);
- output.addAll(splits);
+ public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+ if (projection == null || projection.isEmpty()) {
+ return;
+ }
+ String[] allColumns = projection.getAllColumns();
+ StringBuilder builder = new StringBuilder();
+ for (String column : allColumns) {
+ builder.append(column).append(",");
}
- return output;
+ String columnString = builder.toString();
+ columnString = columnString.substring(0, columnString.length() - 1);
+ configuration.set(COLUMN_PROJECTION, columnString);
}
- /**
- * set the table path into configuration
- * @param conf configuration of the job
- * @param tablePath table path string
- */
- public void setTablePath(Configuration conf, String tablePath) {
+ public static String getColumnProjection(Configuration configuration) {
+ return configuration.get(COLUMN_PROJECTION);
+ }
+
+ public static void setCarbonReadSupport(Configuration configuration,
+ Class<? extends CarbonReadSupport> readSupportClass) {
+ if (readSupportClass != null) {
+ configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
+ }
+ }
+ private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
+ return CarbonStorePath.getCarbonTablePath(absIdentifier);
}
/**
- * return the table path in the configuration
- * @param conf configuration of the job
- * @return table path string
+ * Set list of segments to access
*/
- public String getTablePath(Configuration conf) {
- return null;
+ public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+ configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
}
/**
- * set projection columns into configuration
- * @param conf configuration of the job
- * @param projection projection
+ * Set list of files to access
*/
- public void setProjection(Configuration conf, CarbonProjection projection) {
+ public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
+ configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+ }
+ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ throws IOException {
+ return getCarbonTable(configuration).getAbsoluteTableIdentifier();
}
/**
- * return the projection in the configuration
- * @param conf configuration of the job
- * @return projection
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR
+ * are used to get table path to read.
+ *
+ * @param job
+ * @return List<InputSplit> list of CarbonInputSplit
+ * @throws IOException
*/
- public CarbonProjection getProjection(Configuration conf) {
- return null;
+ @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
+ AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+ TableDataMap blockletMap =
+ DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ List<String> invalidSegments = new ArrayList<>();
+ List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+ List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
+ // get all valid segments and set them into the configuration
+ if (validSegments.size() == 0) {
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+ segmentStatusManager.getValidAndInvalidSegments();
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+ validSegments = segments.getValidSegments();
+ if (validSegments.size() == 0) {
+ return new ArrayList<>(0);
+ }
+
+ // remove entry in the segment index if there are invalid segments
+ invalidSegments.addAll(segments.getInvalidSegments());
+ for (String invalidSegmentId : invalidSegments) {
+ invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+ }
+ if (invalidSegments.size() > 0) {
+ List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+ new ArrayList<>(invalidSegments.size());
+ for (String segId : invalidSegments) {
+ invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+ }
+ blockletMap.clear(invalidSegments);
+ }
+ }
+
+ // process and resolve the expression
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+ // this will be null in case of corrupt schema file.
+ if (null == carbonTable) {
+ throw new IOException("Missing/Corrupt schema file for table.");
+ }
+
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+
+ // prune partitions for filter query on partition table
+ BitSet matchedPartitions = null;
+ if (null != filter) {
+ PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+ if (null != partitionInfo) {
+ Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
+ matchedPartitions = new FilterExpressionProcessor()
+ .getFilteredPartitions(filter, partitionInfo, partitioner);
+ if (matchedPartitions.cardinality() == 0) {
+ // no partition is required
+ return new ArrayList<InputSplit>();
+ }
+ if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
+ // all partitions are required, no need to prune partitions
+ matchedPartitions = null;
+ }
+ }
+ }
+
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+
+ // do block filtering and get split
+ List<InputSplit> splits = getSplits(job, filterInterface, validSegments, matchedPartitions);
+ // pass the invalid segment to task side in order to remove index entry in task side
+ if (invalidSegments.size() > 0) {
+ for (InputSplit split : splits) {
+ ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+ ((org.apache.carbondata.hadoop.CarbonInputSplit) split)
+ .setInvalidTimestampRange(invalidTimestampsList);
+ }
+ }
+ return splits;
}
/**
- * set filter expression into the configuration
- * @param conf configuration of the job
- * @param filter filter expression
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
+ * are used to get table path to read.
+ *
+ * @return
+ * @throws IOException
*/
- public void setFilter(Configuration conf, Expression filter) {
+ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+ List<String> validSegments, BitSet matchedPartitions) throws IOException {
+
+ List<InputSplit> result = new LinkedList<InputSplit>();
+ UpdateVO invalidBlockVOForSegmentId = null;
+ Boolean isIUDTable = false;
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+ isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+ //for each segment fetch blocks matching filter in Driver BTree
+ List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
+ getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+ validSegments);
+ for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+ // Get the UpdateVO for those tables on which IUD operations being performed.
+ if (isIUDTable) {
+ invalidBlockVOForSegmentId =
+ updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+ }
+ if (isIUDTable) {
+ // In case IUD is not performed in this table avoid searching for
+ // invalidated blocks.
+ if (CarbonUtil
+ .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
+ continue;
+ }
+ }
+ String[] deleteDeltaFilePath = null;
+ try {
+ deleteDeltaFilePath =
+ updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+ result.add(inputSplit);
+ }
+ return result;
+ }
+
+ protected Expression getFilterPredicates(Configuration configuration) {
try {
- String filterString = ObjectSerializationUtil.convertObjectToString(filter);
- conf.set(FILTER_PREDICATE, filterString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting filter expression to Job", e);
+ String filterExprString = configuration.get(FILTER_PREDICATE);
+ if (filterExprString == null) {
+ return null;
+ }
+ Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+ return (Expression) filter;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while reading filter expression", e);
}
}
/**
- * return filter expression in the configuration
- * @param conf configuration of the job
- * @return filter expression
+ * get data blocks of given segment
*/
- public Expression getFilter(Configuration conf) {
- Object filter;
- String filterExprString = conf.get(FILTER_PREDICATE);
- if (filterExprString == null) {
- return null;
+ private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+ AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+ BitSet matchedPartitions, List<String> segmentIds) throws IOException {
+
+ QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+ QueryStatistic statistic = new QueryStatistic();
+
+ // get tokens for all the required FileSystem for table path
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+
+ TableDataMap blockletMap = DataMapStoreManager.getInstance()
+ .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+ List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+
+ List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+ for (Blocklet blocklet : prunedBlocklets) {
+ int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+ CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
+
+ // matchedPartitions variable will be null in two cases as follows
+ // 1. the table is not a partition table
+ // 2. the table is a partition table, and all partitions are matched by query
+ // for partition table, the task id of carbaondata file name is the partition id.
+ // if this partition is not required, here will skip it.
+ if (matchedPartitions == null || matchedPartitions.get(taskId)) {
+ resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
+ }
}
+ statistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ return resultFilterredBlocks;
+ }
+
+ private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet)
+ throws IOException {
+ blocklet.updateLocations();
+ org.apache.carbondata.hadoop.CarbonInputSplit split =
+ org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
+ new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()),
+ ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
+ split.setDetailInfo(blocklet.getDetailInfo());
+ return split;
+ }
+
+ @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+ CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+ return new CarbonRecordReader<T>(queryModel, readSupport);
+ }
+
+ public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ CarbonTable carbonTable = getCarbonTable(configuration);
+ // getting the table absoluteTableIdentifier from the carbonTable
+ // to avoid unnecessary deserialization
+ AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+
+ // query plan includes projection column
+ String projection = getColumnProjection(configuration);
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+ QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+ // set the filter to the query model in order to filter blocklet before scan
+ Expression filter = getFilterPredicates(configuration);
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+ FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+ queryModel.setFilterExpressionResolverTree(filterIntf);
+
+ // update the file level index store if there are invalid segment
+ if (inputSplit instanceof CarbonMultiBlockSplit) {
+ CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+ List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+ if (invalidSegments.size() > 0) {
+ queryModel.setInvalidSegmentIds(invalidSegments);
+ }
+ List<UpdateVO> invalidTimestampRangeList =
+ split.getAllSplits().get(0).getInvalidTimestampRange();
+ if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+ queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+ }
+ }
+ return queryModel;
+ }
+
+ public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+ String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+ //By default it uses dictionary decoder read class
+ CarbonReadSupport<T> readSupport = null;
+ if (readSupportClass != null) {
+ try {
+ Class<?> myClass = Class.forName(readSupportClass);
+ Constructor<?> constructor = myClass.getConstructors()[0];
+ Object object = constructor.newInstance();
+ if (object instanceof CarbonReadSupport) {
+ readSupport = (CarbonReadSupport) object;
+ }
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Class " + readSupportClass + "not found", ex);
+ } catch (Exception ex) {
+ LOG.error("Error while creating " + readSupportClass, ex);
+ }
+ } else {
+ readSupport = new DictionaryDecodeReadSupport<>();
+ }
+ return readSupport;
+ }
+
+ @Override protected boolean isSplitable(JobContext context, Path filename) {
try {
- filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
- } catch (IOException e) {
- throw new RuntimeException("Error while reading filter expression", e);
+ // Don't split the file if it is local file system
+ FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+ if (fileSystem instanceof LocalFileSystem) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ /**
+ * required to be moved to core
+ *
+ * @return updateExtension
+ */
+ private String getUpdateExtension() {
+ // TODO: required to modify when supporting update, mostly will be update timestamp
+ return "update";
+ }
+
+ /**
+ * return valid segment to access
+ */
+ private String[] getSegmentsToAccess(JobContext job) {
+ String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+ if (segmentString.trim().isEmpty()) {
+ return new String[0];
}
- assert (filter instanceof Expression);
- return (Expression) filter;
+ return segmentString.split(",");
}
/**
- * Optional API. It can be used by query optimizer to select index based on filter
- * in the configuration of the job. After selecting index internally, index' name will be set
- * in the configuration.
+ * Get the row count of the Block and mapping of segment and Block count.
*
- * The process of selection is simple, just use the default index. Subclass can provide a more
- * advanced selection logic like cost based.
- * @param conf job configuration
+ * @param job
+ * @param identifier
+ * @return
+ * @throws IOException
+ * @throws KeyGenException
*/
- public void selectIndex(Configuration conf) {
- // set the default index in configuration
+ public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
+ throws IOException, KeyGenException {
+ TableDataMap blockletMap =
+ DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+ new SegmentStatusManager(identifier).getValidAndInvalidSegments();
+ Map<String, Long> blockRowCountMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ Map<String, Long> segmentAndBlockCountMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null);
+ for (Blocklet blocklet : blocklets) {
+ String blockName = blocklet.getPath().toString();
+ blockName = CarbonTablePath.getCarbonDataFileName(blockName);
+ blockName = blockName + CarbonTablePath.getCarbonDataExtension();
+
+ long rowCount = blocklet.getDetailInfo().getRowCount();
+
+ String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
+
+ // if block is invalid then dont add the count
+ SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+ if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+ Long blockCount = blockRowCountMapping.get(key);
+ if (blockCount == null) {
+ blockCount = 0L;
+ Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
+ if (count == null) {
+ count = 0L;
+ }
+ segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
+ }
+ blockCount += rowCount;
+ blockRowCountMapping.put(key, blockCount);
+ }
+ }
+ return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6812449/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8270304..8269757 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryDimension;
import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.hadoop.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -77,9 +77,10 @@ public class CarbonInputFormatUtil {
return plan;
}
- public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier,
+ public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
+ AbsoluteTableIdentifier identifier,
Job job) throws IOException {
- CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>();
+ CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
return carbonInputFormat;
}
[7/7] carbondata git commit: Rebase datamap branch onto master
Posted by ja...@apache.org.
Rebase datamap branch onto master
This closes #1196
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/79feac96
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/79feac96
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/79feac96
Branch: refs/heads/master
Commit: 79feac96ae789851c5ad7306a7acaaba25d8e6c9
Parents: b681244
Author: Raghunandan S <ra...@gmail.com>
Authored: Thu Jul 27 20:38:48 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Jul 27 20:41:22 2017 +0800
----------------------------------------------------------------------
.../core/indexstore/UnsafeMemoryDMStore.java | 31 +-
.../blockletindex/BlockletDataMap.java | 11 +-
.../core/indexstore/row/DataMapRow.java | 4 +-
.../core/indexstore/row/DataMapRowImpl.java | 4 +
.../core/indexstore/row/UnsafeDataMapRow.java | 40 +-
.../core/memory/UnsafeMemoryManager.java | 19 +-
.../datatype/DecimalConverterFactory.java | 2 +-
.../hadoop/api/CarbonTableInputFormat.java | 83 ++--
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +-
.../execution/command/carbonTableSchema.scala | 6 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 392 +++++++++----------
11 files changed, 301 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 8246f99..13951dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -19,9 +19,10 @@ package org.apache.carbondata.core.indexstore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
-import org.apache.carbondata.core.memory.MemoryAllocator;
-import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
@@ -39,8 +40,6 @@ public class UnsafeMemoryDMStore {
private int runningLength;
- private MemoryAllocator memoryAllocator;
-
private boolean isMemoryFreed;
private DataMapSchema[] schema;
@@ -49,11 +48,13 @@ public class UnsafeMemoryDMStore {
private int rowCount;
- public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+ private final long taskId = null != ThreadLocalTaskInfo.getCarbonTaskInfo() ?
+ ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId() : System.nanoTime();
+
+ public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException {
this.schema = schema;
- this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
this.allocatedSize = capacity;
- this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+ this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
this.pointers = new int[1000];
}
@@ -63,13 +64,13 @@ public class UnsafeMemoryDMStore {
*
* @param rowSize
*/
- private void ensureSize(int rowSize) {
+ private void ensureSize(int rowSize) throws MemoryException {
if (runningLength + rowSize >= allocatedSize) {
MemoryBlock allocate =
- MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+ UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity);
unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
- memoryAllocator.free(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
allocatedSize = allocatedSize + capacity;
memoryBlock = allocate;
}
@@ -86,7 +87,7 @@ public class UnsafeMemoryDMStore {
* @param indexRow
* @return
*/
- public void addIndexRowToUnsafe(DataMapRow indexRow) {
+ public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
// First calculate the required memory to keep the row in unsafe
int rowSize = indexRow.getTotalSizeInBytes();
// Check whether allocated memory is sufficient or not.
@@ -168,13 +169,13 @@ public class UnsafeMemoryDMStore {
return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
}
- public void finishWriting() {
+ public void finishWriting() throws MemoryException {
if (runningLength < allocatedSize) {
MemoryBlock allocate =
- MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+ UnsafeMemoryManager.allocateMemoryWithRetry(taskId, runningLength);
unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
- memoryAllocator.free(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = allocate;
}
// Compact pointers.
@@ -187,7 +188,7 @@ public class UnsafeMemoryDMStore {
public void freeMemory() {
if (!isMemoryFreed) {
- memoryAllocator.free(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
isMemoryFreed = true;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 79aa091..680852d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -110,7 +111,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
if (unsafeMemoryDMStore != null) {
unsafeMemoryDMStore.finishWriting();
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -156,11 +157,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
DataOutput dataOutput = new DataOutputStream(stream);
blockletInfo.write(dataOutput);
serializedData = stream.toByteArray();
- } catch (IOException e) {
+ row.setByteArray(serializedData, ordinal);
+ unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ } catch (Exception e) {
throw new RuntimeException(e);
}
- row.setByteArray(serializedData, ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
}
}
@@ -176,7 +177,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
return minRow;
}
- private void createSchema(SegmentProperties segmentProperties) {
+ private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
List<DataMapSchema> indexSchemas = new ArrayList<>();
// Index key
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index defe766..631e0ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -62,6 +62,8 @@ public abstract class DataMapRow {
public abstract double getDouble(int ordinal);
+ public abstract int getLengthInBytes(int ordinal);
+
public int getTotalSizeInBytes() {
int len = 0;
for (int i = 0; i < schemas.length; i++) {
@@ -75,7 +77,7 @@ public abstract class DataMapRow {
case FIXED:
return schemas[ordinal].getLength();
case VARIABLE:
- return getByteArray(ordinal).length + 2;
+ return getLengthInBytes(ordinal) + 2;
case STRUCT:
return getRow(ordinal).getTotalSizeInBytes();
default:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index adec346..32d15d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -35,6 +35,10 @@ public class DataMapRowImpl extends DataMapRow {
return (byte[]) data[ordinal];
}
+ @Override public int getLengthInBytes(int ordinal) {
+ return ((byte[]) data[ordinal]).length;
+ }
+
@Override public DataMapRow getRow(int ordinal) {
return (DataMapRow) data[ordinal];
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index ef78514..c398115 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -55,6 +55,31 @@ public class UnsafeDataMapRow extends DataMapRow {
return data;
}
+ @Override public int getLengthInBytes(int ordinal) {
+ int length;
+ int position = getPosition(ordinal);
+ switch (schemas[ordinal].getSchemaType()) {
+ case VARIABLE:
+ length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+ break;
+ default:
+ length = schemas[ordinal].getLength();
+ }
+ return length;
+ }
+
+ private int getLengthInBytes(int ordinal, int position) {
+ int length;
+ switch (schemas[ordinal].getSchemaType()) {
+ case VARIABLE:
+ length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+ break;
+ default:
+ length = schemas[ordinal].getLength();
+ }
+ return length;
+ }
+
@Override public DataMapRow getRow(int ordinal) {
DataMapSchema[] childSchemas =
((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
@@ -123,10 +148,23 @@ public class UnsafeDataMapRow extends DataMapRow {
throw new UnsupportedOperationException("Not supported to set on unsafe row");
}
+ private int getSizeInBytes(int ordinal, int position) {
+ switch (schemas[ordinal].getSchemaType()) {
+ case FIXED:
+ return schemas[ordinal].getLength();
+ case VARIABLE:
+ return getLengthInBytes(ordinal, position) + 2;
+ case STRUCT:
+ return getRow(ordinal).getTotalSizeInBytes();
+ default:
+ throw new UnsupportedOperationException("wrong type");
+ }
+ }
+
private int getPosition(int ordinal) {
int position = 0;
for (int i = 0; i < ordinal; i++) {
- position += getSizeInBytes(i);
+ position += getSizeInBytes(i, position);
}
return position;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 991bc90..d433b5e 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -90,33 +90,28 @@ public class UnsafeMemoryManager {
if (memoryUsed + memoryRequested <= totalMemory) {
MemoryBlock allocate = allocator.allocate(memoryRequested);
memoryUsed += allocate.size();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Working Memory block (" + allocate + ") is created with size " + allocate.size()
- + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
- + "Bytes");
- }
Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
if (null == listOfMemoryBlock) {
listOfMemoryBlock = new HashSet<>();
taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
}
listOfMemoryBlock.add(allocate);
+ LOGGER.info("Memory block (" + allocate + ") is created with size " + allocate.size()
+ + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+ + "Bytes");
return allocate;
}
return null;
}
- public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) {
+ public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
allocator.free(memoryBlock);
memoryUsed -= memoryBlock.size();
memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
- totalMemory - memoryUsed));
- }
+ LOGGER.info(
+ "Freeing memory of size: " + memoryBlock.size() + "available memory: " + (totalMemory
+ - memoryUsed));
}
public void freeMemoryAll(long taskId) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 555df1c..459eb24 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
*/
public final class DecimalConverterFactory {
- public static DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
+ public static final DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
private int[] minBytesForPrecision = minBytesForPrecision();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e73c04a..9e6e284 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
@@ -99,59 +100,56 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
- private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+ private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+ // a cache for carbon table, it will be used in task side
+ private CarbonTable carbonTable;
+
/**
- * It is optional, if user does not set then it reads from store
- *
- * @param configuration
- * @param carbonTable
- * @throws IOException
+ * Set the `tableInfo` in `configuration`
*/
- public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+ public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
throws IOException {
- if (null != carbonTable) {
- configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+ if (null != tableInfo) {
+ configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo));
}
}
- public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
- String carbonTableStr = configuration.get(CARBON_TABLE);
- if (carbonTableStr == null) {
- populateCarbonTable(configuration);
- // read it from schema file in the store
- carbonTableStr = configuration.get(CARBON_TABLE);
- return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
- }
- return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ /**
+ * Get TableInfo object from `configuration`
+ */
+ private TableInfo getTableInfo(Configuration configuration) throws IOException {
+ String tableInfoStr = configuration.get(TABLE_INFO);
+ return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr);
}
/**
- * this method will read the schema from the physical file and populate into CARBON_TABLE
- *
- * @param configuration
- * @throws IOException
+ * Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
- private static void populateCarbonTable(Configuration configuration) throws IOException {
- String dirs = configuration.get(INPUT_DIR, "");
- String[] inputPaths = StringUtils.split(dirs);
- if (inputPaths.length == 0) {
- throw new InvalidPathException("No input paths specified in job");
+ private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ if (carbonTable == null) {
+ // carbon table should be created either from deserialized table info (schema saved in
+ // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+ TableInfo tableInfo = getTableInfo(configuration);
+ CarbonTable carbonTable;
+ if (tableInfo != null) {
+ carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+ } else {
+ carbonTable = SchemaReader.readCarbonTableFromStore(
+ getAbsoluteTableIdentifier(configuration));
+ }
+ this.carbonTable = carbonTable;
+ return carbonTable;
+ } else {
+ return this.carbonTable;
}
- AbsoluteTableIdentifier absoluteTableIdentifier =
- AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
- // read the schema file to get the absoluteTableIdentifier having the correct table id
- // persisted in the schema
- CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
- setCarbonTable(configuration, carbonTable);
}
public static void setTablePath(Configuration configuration, String tablePath)
throws IOException {
configuration.set(FileInputFormat.INPUT_DIR, tablePath);
}
-
/**
* It sets unresolved filter expression.
*
@@ -213,9 +211,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
}
- private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
- return getCarbonTable(configuration).getAbsoluteTableIdentifier();
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
+ }
+ return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
}
/**
@@ -262,7 +265,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
- CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+ CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
// this will be null in case of corrupt schema file.
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
@@ -277,7 +280,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
if (null != partitionInfo) {
Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
matchedPartitions = new FilterExpressionProcessor()
- .getFilteredPartitions(filter, partitionInfo, partitioner);
+ .getFilteredPartitions(filter, partitionInfo);
if (matchedPartitions.cardinality() == 0) {
// no partition is required
return new ArrayList<InputSplit>();
@@ -320,7 +323,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
Boolean isIUDTable = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
- getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(absoluteTableIdentifier);
@@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
- CarbonTable carbonTable = getCarbonTable(configuration);
+ CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 1a8183c..add0578 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -265,7 +265,7 @@ class CarbonMergerRDD[K, V](
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
- CarbonInputFormat.setTableInfo(job.getConfiguration,
+ CarbonTableInputFormat.setTableInfo(job.getConfiguration,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
var updateDetails: UpdateVO = null
// initialise query_id for job
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f3baf58..d34b91d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -268,7 +268,7 @@ case class DeleteLoadsById(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
CarbonStore.deleteLoadById(
loadids,
@@ -293,7 +293,7 @@ case class DeleteLoadsByLoadDate(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
@@ -847,7 +847,7 @@ case class ShowLoads(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
CarbonStore.showSegments(
getDB.getDatabaseName(databaseNameOp, sparkSession),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 549841b..c9eaf6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
-class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
@transient
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
System.nanoTime() + ""
}
- lazy val metadata = loadMetadata(storePath, nextQueryId)
+ val metadata = MetaData(new ArrayBuffer[TableMeta]())
/**
@@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
override def createCarbonRelation(parameters: Map[String, String],
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation = {
- lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
- Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
- .asInstanceOf[CarbonRelation]
+ val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
+ val tables = getTableFromMetadataCache(database, tableName)
+ tables match {
+ case Some(t) =>
+ CarbonRelation(database, tableName,
+ CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+ case None =>
+ readCarbonSchema(absIdentifier) match {
+ case Some(meta) =>
+ CarbonRelation(database, tableName,
+ CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+ case None =>
+ throw new NoSuchTableException(database, tableName)
+ }
+ }
}
def lookupRelation(dbName: Option[String], tableName: String)
@@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
}
- def lookupRelation(tableIdentifier: TableIdentifier)
+ override def lookupRelation(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): LogicalPlan = {
- checkSchemasModifiedTimeAndReloadTables()
val database = tableIdentifier.database.getOrElse(
- sparkSession.catalog.currentDatabase
- )
- val tables = getTableFromMetadata(database, tableIdentifier.table, true)
- tables match {
- case Some(t) =>
- CarbonRelation(database, tableIdentifier.table,
- CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
- case None =>
- throw new NoSuchTableException(database, tableIdentifier.table)
+ sparkSession.catalog.currentDatabase)
+ val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+ _) =>
+ carbonDatasourceHadoopRelation.carbonRelation
+ case LogicalRelation(
+ carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+ carbonDatasourceHadoopRelation.carbonRelation
+ case _ => throw new NoSuchTableException(database, tableIdentifier.table)
}
+ relation
}
/**
@@ -123,11 +138,10 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param tableName
* @return
*/
- def getTableFromMetadata(database: String,
- tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+ def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
metadata.tablesMeta
.find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
}
def tableExists(
@@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
tableExists(TableIdentifier(table, databaseOp))(sparkSession)
}
- def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val tables = metadata.tablesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
- tables.nonEmpty
- }
-
- def loadMetadata(metadataPath: String, queryId: String): MetaData = {
- val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val statistic = new QueryStatistic()
- // creating zookeeper instance once.
- // if zookeeper is configured as carbon lock type.
- val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
- if (null != zookeeperurl) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
- }
- if (metadataPath == null) {
- return null
- }
- // if no locktype is configured and store type is HDFS set HDFS lock as default
- if (null == CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
- FileType.HDFS == FileFactory.getFileType(metadataPath)) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.LOCK_TYPE,
- CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
- )
- LOGGER.info("Default lock type HDFSLOCK is configured")
+ override def tableExists(tableIdentifier: TableIdentifier)
+ (sparkSession: SparkSession): Boolean = {
+ try {
+ lookupRelation(tableIdentifier)(sparkSession)
+ } catch {
+ case e: Exception =>
+ return false
}
- val fileType = FileFactory.getFileType(metadataPath)
- val metaDataBuffer = new ArrayBuffer[TableMeta]
- fillMetaData(metadataPath, fileType, metaDataBuffer)
- updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
- statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
- System.currentTimeMillis())
- recorder.recordStatisticsForDriver(statistic, queryId)
- MetaData(metaDataBuffer)
+ true
}
- private def fillMetaData(basePath: String, fileType: FileType,
- metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
- val databasePath = basePath // + "/schemas"
- try {
- if (FileFactory.isFileExist(databasePath, fileType)) {
- val file = FileFactory.getCarbonFile(databasePath, fileType)
- val databaseFolders = file.listFiles()
-
- databaseFolders.foreach(databaseFolder => {
- if (databaseFolder.isDirectory) {
- val dbName = databaseFolder.getName
- val tableFolders = databaseFolder.listFiles()
-
- tableFolders.foreach(tableFolder => {
- if (tableFolder.isDirectory) {
- val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
- tableFolder.getName, UUID.randomUUID().toString)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
- carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableName = tableFolder.getName
- val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
- val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
- val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
- carbonTable)
- }
- }
- })
- }
- })
- } else {
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
- }
- } catch {
- case s: java.io.FileNotFoundException =>
- s.printStackTrace()
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
+ private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+ val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = identifier.getCarbonTableIdentifier.getTableName
+ val storePath = identifier.getStorePath
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
+ tableName.toLowerCase(), UUID.randomUUID().toString)
+ val carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableUniqueName = dbName + "_" + tableName
+ val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+ val schemaFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+ wrapperTableInfo.setStorePath(storePath)
+ wrapperTableInfo
+ .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
+ identifier.getStorePath,
+ identifier.getTablePath,
+ carbonTable)
+ metadata.tablesMeta += tableMeta
+ Some(tableMeta)
+ } else {
+ None
}
}
@@ -238,28 +201,36 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param newTableIdentifier
* @param thriftTableInfo
* @param schemaEvolutionEntry
- * @param carbonStorePath
+ * @param tablePath
* @param sparkSession
*/
def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
- carbonStorePath: String)
- (sparkSession: SparkSession): String = {
+ tablePath: String) (sparkSession: SparkSession): String = {
+ val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
- newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName,
- carbonStorePath)
- createSchemaThriftFile(wrapperTableInfo,
+ newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName,
+ absoluteTableIdentifier.getStorePath)
+ val identifier =
+ new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName,
+ wrapperTableInfo.getFactTable.getTableId)
+ val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName)(sparkSession)
+ identifier)
+ addTableCache(wrapperTableInfo,
+ AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
+ newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName))
+ path
}
/**
@@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param carbonStorePath
+ * @param tablePath
* @param sparkSession
*/
def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- carbonStorePath: String)
- (sparkSession: SparkSession): String = {
+ tablePath: String)(sparkSession: SparkSession): String = {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName,
- carbonStorePath)
+ tableIdentifier.getStorePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
- createSchemaThriftFile(wrapperTableInfo,
+ wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+ val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- carbonTableIdentifier.getDatabaseName,
- carbonTableIdentifier.getTableName)(sparkSession)
+ tableIdentifier.getCarbonTableIdentifier)
+ addTableCache(wrapperTableInfo, tableIdentifier)
+ path
}
@@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* Load CarbonTable from wrapper tableInfo
*
*/
- def createTableFromThrift(
- tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
- dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
- if (tableExists(tableName, Some(dbName))(sparkSession)) {
- sys.error(s"Table [$tableName] already exists under Database [$dbName]")
- }
- val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+ def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val dbName = tableInfo.getDatabaseName
+ val tableName = tableInfo.getFactTable.getTableName
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
- .add(schemaEvolutionEntry)
- val carbonTablePath = createSchemaThriftFile(tableInfo,
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ tableInfo.setStorePath(identifier.getStorePath)
+ createSchemaThriftFile(tableInfo,
thriftTableInfo,
- dbName,
- tableName)(sparkSession)
+ identifier.getCarbonTableIdentifier)
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- (carbonTablePath, "")
+ }
+
+ /**
+ * Generates schema string from TableInfo
+ */
+ override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+ tablePath: String): String = {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+ val schemaMetadataPath =
+ CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+ tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ tableInfo.setStorePath(tableIdentifier.getStorePath)
+ val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+ tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+ removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
+ CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+ addTableCache(tableInfo, tableIdentifier)
+ CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
}
/**
@@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
*
* @param tableInfo
* @param thriftTableInfo
- * @param dbName
- * @param tableName
- * @param sparkSession
* @return
*/
- private def createSchemaThriftFile(
- tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
- thriftTableInfo: org.apache.carbondata.format.TableInfo,
- dbName: String, tableName: String)
- (sparkSession: SparkSession): String = {
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
- tableInfo.getFactTable.getTableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
+ thriftTableInfo: TableInfo,
+ carbonTableIdentifier: CarbonTableIdentifier): String = {
+ val carbonTablePath = CarbonStorePath.
+ getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(storePath)
val fileType = FileFactory.getFileType(schemaMetadataPath)
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
- removeTableFromMetadata(dbName, tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+ carbonTablePath.getPath
+ }
+
+ protected def addTableCache(tableInfo: table.TableInfo,
+ absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+ val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
+ CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
+ removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
- CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+ val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+ absoluteTableIdentifier.getTablePath,
+ CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
metadata.tablesMeta += tableMeta
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- carbonTablePath.getPath
}
/**
@@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param tableName
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
metadataToBeRemoved match {
case Some(tableMeta) =>
metadata.tablesMeta -= tableMeta
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
case None =>
- LOGGER.debug(s"No entry for table $tableName in database $dbName")
+ if (LOGGER.isDebugEnabled) {
+ LOGGER.debug(s"No entry for table $tableName in database $dbName")
+ }
}
}
@@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val tableName = tableIdentifier.table.toLowerCase
-
- val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
- val fileType = FileFactory.getFileType(tablePath)
- FileFactory.isFileExist(tablePath, fileType)
+ try {
+ val tablePath = lookupRelation(tableIdentifier)(sparkSession).
+ asInstanceOf[CarbonRelation].tableMeta.tablePath
+ val fileType = FileFactory.getFileType(tablePath)
+ FileFactory.isFileExist(tablePath, fileType)
+ } catch {
+ case e: Exception =>
+ false
+ }
}
- def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession) {
val dbName = tableIdentifier.database.get
val tableName = tableIdentifier.table
-
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
@@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables
-
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
- tableIdentifier.table)
- metadataToBeRemoved match {
- case Some(tableMeta) =>
- metadata.tablesMeta -= tableMeta
- CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- case None =>
- LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
- }
+ checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+
+ removeTableFromMetadata(dbName, tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
}
}
- private def getTimestampFileAndType(databaseName: String, tableName: String) = {
- val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+ private def getTimestampFileAndType(basePath: String) = {
+ val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
val timestampFileType = FileFactory.getFileType(timestampFile)
(timestampFile, timestampFileType)
}
@@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
tableModifiedTimeStore.put("default", timeStamp)
}
- def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+ def updateAndTouchSchemasUpdatedTime(basePath: String) {
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
}
- /**
- * This method will read the timestamp of empty schema file
- *
- * @param databaseName
- * @param tableName
- * @return
- */
- private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
- } else {
- System.currentTimeMillis()
- }
- }
/**
* This method will check and create an empty schema timestamp file
*
- * @param databaseName
- * @param tableName
* @return
*/
- private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ private def touchSchemaFileSystemTime(basePath: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+ LOGGER.audit(s"Creating timestamp file for $basePath")
FileFactory.createNewFile(timestampFile, timestampFileType)
}
val systemTime = System.currentTimeMillis()
@@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
systemTime
}
- def checkSchemasModifiedTimeAndReloadTables() {
- val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+ def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+ val (timestampFile, timestampFileType) =
+ getTimestampFileAndType(storePath)
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
@@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
}
private def refreshCache() {
- metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+ metadata.tablesMeta.clear()
}
override def isReadFromHiveMetaStore: Boolean = false
@@ -527,4 +492,3 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
CarbonUtil.readSchemaFile(tableMetadataFile)
}
}
-