You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/04/26 10:40:18 UTC
carbondata git commit: [CARBONDATA-2362] Changing the Cacheable
object from DataMap to Wrapper
Repository: carbondata
Updated Branches:
refs/heads/branch-1.3 bb35c255e -> 0b0d8e653
[CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper
1. Cacheable object is changed from dataMap to wrapper
2. Blocklet info is stored as binary to dataMap
This closes #2187
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0b0d8e65
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0b0d8e65
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0b0d8e65
Branch: refs/heads/branch-1.3
Commit: 0b0d8e6536ff1d657e5201affd671c2e21c03ad9
Parents: bb35c25
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Apr 25 11:32:26 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Apr 26 16:12:52 2018 +0530
----------------------------------------------------------------------
.../dictionary/AbstractDictionaryCache.java | 4 +-
.../carbondata/core/datamap/TableDataMap.java | 10 +-
.../core/datamap/dev/CacheableDataMap.java | 10 +-
.../core/datastore/BlockIndexStore.java | 3 +-
.../core/datastore/SegmentTaskIndexStore.java | 4 +-
.../indexstore/BlockletDataMapIndexStore.java | 114 ++++++++++++-----
.../indexstore/BlockletDataMapIndexWrapper.java | 52 ++++++++
.../core/indexstore/BlockletDetailInfo.java | 60 +++++++--
.../core/indexstore/BlockletDetailsFetcher.java | 10 --
.../core/indexstore/SafeMemoryDMStore.java | 11 ++
.../blockletindex/BlockletDataMap.java | 73 +++++------
.../BlockletDataMapDistributable.java | 18 +--
.../blockletindex/BlockletDataMapFactory.java | 128 +++++++++----------
.../blockletindex/SegmentIndexFileStore.java | 21 ++-
.../core/indexstore/schema/CarbonRowSchema.java | 4 +
.../core/metadata/SegmentFileStore.java | 29 +++++
.../core/metadata/blocklet/BlockletInfo.java | 8 +-
.../core/metadata/blocklet/DataFileFooter.java | 2 +-
.../core/metadata/schema/table/TableInfo.java | 23 ++++
.../core/util/BlockletDataMapUtil.java | 120 ++++++++++++-----
.../TestBlockletDataMapFactory.java | 27 ++--
.../apache/carbondata/hadoop/CacheClient.java | 26 ++--
.../hadoop/api/AbstractDataMapJob.java | 7 +-
.../carbondata/hadoop/api/DataMapJob.java | 5 +-
24 files changed, 503 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index 9ed9007..78feb4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -59,7 +59,9 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
initThreadPoolSize();
}
- @Override public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
+ @Override
+ public void put(DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier,
+ Dictionary value) {
throw new UnsupportedOperationException("Operation not supported");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 020d6c9..23e7695 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -126,11 +126,11 @@ public final class TableDataMap extends OperationEventListener {
for (DataMap dataMap : dataMaps) {
blocklets.addAll(dataMap.prune(filterExp, partitions));
}
- for (Blocklet blocklet: blocklets) {
- ExtendedBlocklet detailedBlocklet =
- blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment());
- detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo());
- detailedBlocklets.add(detailedBlocklet);
+ List<ExtendedBlocklet> detailedBlockletList =
+ blockletDetailsFetcher.getExtendedBlocklets(blocklets, distributable.getSegment());
+ for (ExtendedBlocklet blocklet : detailedBlockletList) {
+ blocklet.setSegmentId(distributable.getSegment().getSegmentNo());
+ detailedBlocklets.add(blocklet);
}
return detailedBlocklets;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
index 885b21f..19ec94c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.memory.MemoryException;
/**
@@ -29,11 +31,13 @@ import org.apache.carbondata.core.memory.MemoryException;
public interface CacheableDataMap {
/**
- * Add the dataMap to cache
+ * Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier
*
- * @param dataMap
+ * @param tableBlockIndexUniqueIdentifier
+ * @param blockletDataMapIndexWrapper
*/
- void cache(DataMap dataMap) throws IOException, MemoryException;
+ void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException;
/**
* Get all the uncached distributables from the list.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
index 609bf1c..8055ffd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
@@ -229,7 +229,8 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
.remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
}
- @Override public void put(TableBlockUniqueIdentifier key, AbstractIndex value) {
+ @Override
+ public void put(TableBlockUniqueIdentifier tableBlockUniqueIdentifier, AbstractIndex value) {
throw new UnsupportedOperationException("Operation not supported");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 744cc93..2d6e77c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -140,7 +140,9 @@ public class SegmentTaskIndexStore
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
- @Override public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value) {
+ @Override
+ public void put(TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
+ SegmentTaskIndexWrapper value) {
throw new UnsupportedOperationException("Operation not supported");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index ce0fe8b..1f31bd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
@@ -41,7 +42,7 @@ import org.apache.carbondata.core.util.BlockletDataMapUtil;
* blocks
*/
public class BlockletDataMapIndexStore
- implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+ implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
/**
@@ -68,54 +69,90 @@ public class BlockletDataMapIndexStore
}
@Override
- public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier)
+ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier)
throws IOException {
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
- BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
- if (dataMap == null) {
+ BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
+ (BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
+ List<DataMap> dataMaps = new ArrayList<>();
+ if (blockletDataMapIndexWrapper == null) {
try {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
Set<String> filesRead = new HashSet<>();
- Map<String, BlockMetaInfo> blockMetaInfoMap =
- BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
- dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
- } catch (MemoryException e) {
+ long memorySize = 0L;
+ String segmentFilePath = identifier.getIndexFilePath();
+ Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = BlockletDataMapUtil
+ .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+ // if the identifier is not a merge file we can directly load the datamaps
+ if (identifier.getMergeIndexFileName() == null) {
+ Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+ .getBlockMetaInfoMap(identifier, indexFileStore, filesRead,
+ carbonDataFileBlockMetaInfoMapping);
+ BlockletDataMap blockletDataMap =
+ loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
+ memorySize += blockletDataMap.getMemorySize();
+ dataMaps.add(blockletDataMap);
+ blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+ } else {
+ // if the identifier is a merge file then collect the index files and load the datamaps
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
+ for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+ .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead,
+ carbonDataFileBlockMetaInfoMapping);
+ BlockletDataMap blockletDataMap =
+ loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
+ memorySize += blockletDataMap.getMemorySize();
+ dataMaps.add(blockletDataMap);
+ }
+ blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+ }
+ lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
+ memorySize);
+ } catch (Throwable e) {
+ // clear all the memory used by datamaps loaded
+ for (DataMap dataMap : dataMaps) {
+ dataMap.clear();
+ }
LOGGER.error("memory exception when loading datamap: " + e.getMessage());
throw new RuntimeException(e.getMessage(), e);
}
}
- return dataMap;
+ return blockletDataMapIndexWrapper;
}
@Override
- public List<BlockletDataMap> getAll(
+ public List<BlockletDataMapIndexWrapper> getAll(
List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
- List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+ List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
+ new ArrayList<>(tableSegmentUniqueIdentifiers.size());
List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
ExecutorService service = null;
+ BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null;
// Get the datamaps for each indexfile from cache.
try {
for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
- BlockletDataMap ifPresent = getIfPresent(identifier);
- if (ifPresent != null) {
- blockletDataMaps.add(ifPresent);
+ BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier);
+ if (dataMapIndexWrapper != null) {
+ blockletDataMapIndexWrappers.add(dataMapIndexWrapper);
} else {
missedIdentifiers.add(identifier);
}
}
if (missedIdentifiers.size() > 0) {
- SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
- Set<String> filesRead = new HashSet<>();
- for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
- Map<String, BlockMetaInfo> blockMetaInfoMap =
- BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
- blockletDataMaps.add(
- loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
+ for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) {
+ blockletDataMapIndexWrapper = get(identifier);
+ blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
}
}
} catch (Throwable e) {
- for (BlockletDataMap dataMap : blockletDataMaps) {
- dataMap.clear();
+ if (null != blockletDataMapIndexWrapper) {
+ List<DataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
+ for (DataMap dataMap : dataMaps) {
+ dataMap.clear();
+ }
}
throw new IOException("Problem in loading segment blocks.", e);
} finally {
@@ -123,7 +160,7 @@ public class BlockletDataMapIndexStore
service.shutdownNow();
}
}
- return blockletDataMaps;
+ return blockletDataMapIndexWrappers;
}
/**
@@ -133,10 +170,10 @@ public class BlockletDataMapIndexStore
* @return
*/
@Override
- public BlockletDataMap getIfPresent(
+ public BlockletDataMapIndexWrapper getIfPresent(
TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
- return (BlockletDataMap) lruCache.get(
- tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+ return (BlockletDataMapIndexWrapper) lruCache
+ .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
/**
@@ -149,14 +186,16 @@ public class BlockletDataMapIndexStore
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
- @Override public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
- BlockletDataMap blockletDataMap) throws IOException, MemoryException {
+ @Override
+ public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
if (lock == null) {
lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
}
+ long memorySize = 0L;
// As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
// as in that case clearing unsafe memory need to be taken card. If at all datamap entry
// in the cache need to be overwritten then use the invalidate interface
@@ -164,13 +203,20 @@ public class BlockletDataMapIndexStore
if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
synchronized (lock) {
if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ List<DataMap> dataMaps = wrapper.getDataMaps();
try {
- blockletDataMap.convertToUnsafeDMStore();
- lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(),
- blockletDataMap, blockletDataMap.getMemorySize());
+ for (DataMap dataMap: dataMaps) {
+ BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
+ blockletDataMap.convertToUnsafeDMStore();
+ memorySize += blockletDataMap.getMemorySize();
+ }
+ lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper,
+ memorySize);
} catch (Throwable e) {
// clear all the memory acquired by data map in case of any failure
- blockletDataMap.clear();
+ for (DataMap blockletDataMap : dataMaps) {
+ blockletDataMap.clear();
+ }
throw new IOException("Problem in adding datamap to cache.", e);
}
}
@@ -205,8 +251,6 @@ public class BlockletDataMapIndexStore
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
blockMetaInfoMap, identifier.getSegmentId()));
- lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
- dataMap.getMemorySize());
}
return dataMap;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
new file mode 100644
index 0000000..1f0c089
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+
+/**
+ * A cacheable wrapper of datamaps
+ */
+public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
+
+ private List<DataMap> dataMaps;
+
+ public BlockletDataMapIndexWrapper(List<DataMap> dataMaps) {
+ this.dataMaps = dataMaps;
+ }
+
+ @Override public long getFileTimeStamp() {
+ return 0;
+ }
+
+ @Override public int getAccessCount() {
+ return 0;
+ }
+
+ @Override public long getMemorySize() {
+ return 0;
+ }
+
+ public List<DataMap> getDataMaps() {
+ return dataMaps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index ce05fe2..cfdb3f0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -22,20 +22,29 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.hadoop.io.Writable;
-import org.xerial.snappy.Snappy;
/**
* Blocklet detail information to be sent to each executor
*/
public class BlockletDetailInfo implements Serializable, Writable {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletDetailInfo.class.getName());
+
+ private static final long serialVersionUID = -4160667488700814493L;
+
private int rowCount;
private short pagesCount;
@@ -48,7 +57,9 @@ public class BlockletDetailInfo implements Serializable, Writable {
private long schemaUpdatedTimeStamp;
- private BlockletInfo blockletInfo;
+ private transient BlockletInfo blockletInfo;
+
+ private byte[] blockletInfoBinary;
private long blockFooterOffset;
@@ -90,6 +101,26 @@ public class BlockletDetailInfo implements Serializable, Writable {
this.blockletInfo = blockletInfo;
}
+ private void setBlockletInfoFromBinary() throws IOException {
+ if (null == this.blockletInfo && null != blockletInfoBinary && blockletInfoBinary.length > 0) {
+ blockletInfo = new BlockletInfo();
+ ByteArrayInputStream stream = new ByteArrayInputStream(blockletInfoBinary);
+ DataInputStream inputStream = new DataInputStream(stream);
+ try {
+ blockletInfo.readFields(inputStream);
+ } catch (IOException e) {
+ LOGGER.error("Problem in reading blocklet info");
+ throw new IOException("Problem in reading blocklet info." + e.getMessage());
+ } finally {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOGGER.error(e, "Problem in closing input stream of reading blocklet info.");
+ }
+ }
+ }
+ }
+
public int[] getDimLens() {
return dimLens;
}
@@ -131,6 +162,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
out.writeLong(blockFooterOffset);
out.writeInt(columnSchemaBinary.length);
out.write(columnSchemaBinary);
+ out.writeInt(blockletInfoBinary.length);
+ out.write(blockletInfoBinary);
out.writeLong(blockSize);
}
@@ -153,6 +186,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
byte[] schemaArray = new byte[bytesSize];
in.readFully(schemaArray);
readColumnSchema(schemaArray);
+ int byteSize = in.readInt();
+ blockletInfoBinary = new byte[byteSize];
+ in.readFully(blockletInfoBinary);
+ setBlockletInfoFromBinary();
blockSize = in.readLong();
}
@@ -162,17 +199,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
* @throws IOException
*/
public void readColumnSchema(byte[] schemaArray) throws IOException {
- // uncompress it.
- schemaArray = Snappy.uncompress(schemaArray);
- ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
- DataInput schemaInput = new DataInputStream(schemaStream);
- columnSchemas = new ArrayList<>();
- int size = schemaInput.readShort();
- for (int i = 0; i < size; i++) {
- ColumnSchema columnSchema = new ColumnSchema();
- columnSchema.readFields(schemaInput);
- columnSchemas.add(columnSchema);
- }
+ BlockletDataMap blockletDataMap = new BlockletDataMap();
+ columnSchemas = blockletDataMap.readColumnSchema(schemaArray);
}
/**
@@ -224,4 +252,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
public byte[] getColumnSchemaBinary() {
return columnSchemaBinary;
}
+
+ public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
+ this.blockletInfoBinary = blockletInfoBinary;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index b4d6db2..a2f49c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -36,14 +36,4 @@ public interface BlockletDetailsFetcher {
*/
List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
throws IOException;
-
- /**
- * Get the blocklet detail information based on blockletid, blockid and segmentid.
- *
- * @param blocklet
- * @param segment
- * @return
- * @throws IOException
- */
- ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
index d51f28d..84dd91e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.DataTypeUtil;
/**
* Store the data map row @{@link DataMapRow} data to memory.
@@ -83,6 +84,7 @@ public class SafeMemoryDMStore extends AbstractMemoryDMStore {
@Override
public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+ setSchemaDataType();
UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema);
for (DataMapRow dataMapRow : dataMapRows) {
unsafeMemoryDMStore.addIndexRow(dataMapRow);
@@ -91,4 +93,13 @@ public class SafeMemoryDMStore extends AbstractMemoryDMStore {
return unsafeMemoryDMStore;
}
+ /**
+ * Set the dataType to the schema. Needed in case of serialization / deserialization
+ */
+ private void setSchemaDataType() {
+ for (CarbonRowSchema carbonRowSchema : schema) {
+ carbonRowSchema.setDataType(DataTypeUtil.valueOf(carbonRowSchema.getDataType(), 0, 0));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 66fa0aa..058f500 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.indexstore.blockletindex;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@@ -32,7 +33,6 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
@@ -46,7 +46,6 @@ import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
-import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -78,7 +77,7 @@ import org.xerial.snappy.Snappy;
/**
* Datamap implementation for blocklet.
*/
-public class BlockletDataMap implements DataMap, Cacheable {
+public class BlockletDataMap implements DataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BlockletDataMap.class.getName());
@@ -134,7 +133,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
private int[] columnCardinality;
- private TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier;
+ private long blockletSchemaTime;
@Override
public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
@@ -159,6 +158,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
if (segmentProperties == null) {
List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
schemaBinary = convertSchemaToBinary(columnInTable);
+ blockletSchemaTime = fileFooter.getSchemaUpdatedTimeStamp();
columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
createSchema(segmentProperties,
@@ -207,9 +207,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
segmentId);
summaryDMStore.finishWriting();
}
- LOGGER.info(
- "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is " + (
- System.currentTimeMillis() - startTime));
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is "
+ + (System.currentTimeMillis() - startTime));
+ }
}
private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
@@ -771,23 +773,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
detailInfo.setBlockletId((short) blockletId);
detailInfo.setDimLens(columnCardinality);
detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
- byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
- BlockletInfo blockletInfo = null;
+ detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCK_INFO_INDEX));
try {
- if (byteArray.length > 0) {
- blockletInfo = new BlockletInfo();
- ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
- DataInputStream inputStream = new DataInputStream(stream);
- blockletInfo.readFields(inputStream);
- inputStream.close();
- }
blocklet.setLocation(
new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
.split(","));
} catch (IOException e) {
throw new RuntimeException(e);
}
- detailInfo.setBlockletInfo(blockletInfo);
blocklet.setDetailInfo(detailInfo);
detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
@@ -919,7 +912,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
return dataMapRow;
}
- private byte[] getColumnSchemaBinary() {
+ public byte[] getColumnSchemaBinary() {
DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
return unsafeRow.getByteArray(SCHEMA);
}
@@ -956,17 +949,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
}
}
- @Override
- public long getFileTimeStamp() {
- return 0;
- }
-
- @Override
- public int getAccessCount() {
- return 0;
- }
-
- @Override
public long getMemorySize() {
long memoryUsed = 0L;
if (memoryDMStore != null) {
@@ -978,15 +960,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
return memoryUsed;
}
- public TableBlockIndexUniqueIdentifier getTableBlockUniqueIdentifier() {
- return tableBlockUniqueIdentifier;
- }
-
- public void setTableBlockUniqueIdentifier(
- TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier) {
- this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier;
- }
-
public void setSegmentProperties(SegmentProperties segmentProperties) {
this.segmentProperties = segmentProperties;
}
@@ -995,6 +968,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
return columnCardinality;
}
+ public long getBlockletSchemaTime() {
+ return blockletSchemaTime;
+ }
+
private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
throws MemoryException {
AbstractMemoryDMStore memoryDMStore;
@@ -1024,4 +1001,24 @@ public class BlockletDataMap implements DataMap, Cacheable {
}
}
+ /**
+ * Read column schema from binary
+ * @param schemaArray
+ * @throws IOException
+ */
+ public List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
+ // uncompress it.
+ schemaArray = Snappy.uncompress(schemaArray);
+ ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
+ DataInput schemaInput = new DataInputStream(schemaStream);
+ List<ColumnSchema> columnSchemas = new ArrayList<>();
+ int size = schemaInput.readShort();
+ for (int i = 0; i < size; i++) {
+ ColumnSchema columnSchema = new ColumnSchema();
+ columnSchema.readFields(schemaInput);
+ columnSchemas.add(columnSchema);
+ }
+ return columnSchemas;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 02ac8d7..08d8fc9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -16,8 +16,6 @@
*/
package org.apache.carbondata.core.indexstore.blockletindex;
-import java.util.Set;
-
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
@@ -34,24 +32,22 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
*/
private String filePath;
- private Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers;
+ private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
- public BlockletDataMapDistributable(String indexFilePath,
- Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
+ public BlockletDataMapDistributable(String indexFilePath) {
this.filePath = indexFilePath;
- this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
}
public String getFilePath() {
return filePath;
}
- public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers() {
- return tableBlockIndexUniqueIdentifiers;
+ public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() {
+ return tableBlockIndexUniqueIdentifier;
}
- public void setTableBlockIndexUniqueIdentifiers(
- Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
- this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
+ public void setTableBlockIndexUniqueIdentifier(
+ TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier) {
+ this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 6d8feeb..0cc3ef0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -37,12 +37,12 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -63,7 +63,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
// segmentId -> list of index file
private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
- private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+ private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
@Override
public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -82,13 +82,18 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
new ArrayList<>(identifiers.size());
+ List<DataMap> dataMaps = new ArrayList<>();
tableBlockIndexUniqueIdentifiers.addAll(identifiers);
- return cache.getAll(tableBlockIndexUniqueIdentifiers);
+ List<BlockletDataMapIndexWrapper> dataMapsList = cache.getAll(tableBlockIndexUniqueIdentifiers);
+ for (BlockletDataMapIndexWrapper wrapper : dataMapsList) {
+ dataMaps.addAll(wrapper.getDataMaps());
+ }
+ return dataMaps;
}
- @Override public void cache(DataMap dataMap) throws IOException, MemoryException {
- BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
- cache.put(blockletDataMap.getTableBlockUniqueIdentifier(), blockletDataMap);
+ @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
+ cache.put(tableBlockIndexUniqueIdentifier, wrapper);
}
@Override
@@ -100,16 +105,13 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
getTableBlockIndexUniqueIdentifiers(segment);
// filter out the tableBlockIndexUniqueIdentifiers based on distributable
- Set<TableBlockIndexUniqueIdentifier> validIdentifiers = BlockletDataMapUtil
+ TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil
.filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
(BlockletDataMapDistributable) distributable);
- ((BlockletDataMapDistributable) distributable)
- .setTableBlockIndexUniqueIdentifiers(validIdentifiers);
- for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : validIdentifiers) {
- if (null == cache.getIfPresent(tableBlockIndexUniqueIdentifier)) {
- distributablesToBeLoaded.add(distributable);
- break;
- }
+ if (null == cache.getIfPresent(validIdentifier)) {
+ ((BlockletDataMapDistributable) distributable)
+ .setTableBlockIndexUniqueIdentifier(validIdentifier);
+ distributablesToBeLoaded.add(distributable);
}
}
return distributablesToBeLoaded;
@@ -147,71 +149,56 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
getTableBlockIndexUniqueIdentifiers(segment);
// Retrieve each blocklets detail information from blocklet datamap
for (Blocklet blocklet : blocklets) {
- detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
+ detailedBlocklets.addAll(getExtendedBlocklet(identifiers, blocklet));
}
return detailedBlocklets;
}
- @Override
- public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
- throws IOException {
- if (blocklet instanceof ExtendedBlocklet) {
- return (ExtendedBlocklet) blocklet;
- }
- Set<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segment);
- return getExtendedBlocklet(identifiers, blocklet);
- }
-
- private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
- Blocklet blocklet) throws IOException {
+ private List<ExtendedBlocklet> getExtendedBlocklet(
+ Set<TableBlockIndexUniqueIdentifier> identifiers, Blocklet blocklet) throws IOException {
+ List<ExtendedBlocklet> blocklets = new ArrayList<>();
String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
- for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
- if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
- DataMap dataMap = cache.get(identifier);
- return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+ try {
+ for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
+ if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
+ BlockletDataMapIndexWrapper wrapper = cache.get(identifier);
+ List<DataMap> dataMaps = wrapper.getDataMaps();
+ for (DataMap dataMap : dataMaps) {
+ BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
+ blocklets
+ .add(blockletDataMap.getDetailedBlocklet(blocklet.getBlockletId()));
+ }
+ }
}
+ return blocklets;
+ } catch (Exception e) {
+ throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
}
- throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
}
@Override
public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> distributables = new ArrayList<>();
- Map<String, String> indexFiles = null;
try {
- CarbonFile[] carbonIndexFiles;
- if (segment.getSegmentFileName() == null) {
- carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
- } else {
- SegmentFileStore fileStore =
- new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
- indexFiles = fileStore.getIndexFiles();
- carbonIndexFiles = new CarbonFile[indexFiles.size()];
- int i = 0;
- for (Map.Entry<String, String> entry : indexFiles.entrySet()) {
- String indexFile = entry.getKey();
- String mergeFileName = entry.getValue();
- if (null != mergeFileName) {
- String mergeIndexPath = indexFile
- .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1)
- + mergeFileName;
- carbonIndexFiles[i++] = FileFactory.getCarbonFile(mergeIndexPath);
- } else {
- carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
- }
- }
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+ CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()];
+ int identifierCounter = 0;
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath();
+ String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName();
+ carbonIndexFiles[identifierCounter++] = FileFactory
+ .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName);
}
for (int i = 0; i < carbonIndexFiles.length; i++) {
Path path = new Path(carbonIndexFiles[i].getPath());
-
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
LocatedFileStatus fileStatus = iter.next();
String[] location = fileStatus.getBlockLocations()[0].getHosts();
BlockletDataMapDistributable distributable =
- new BlockletDataMapDistributable(path.toString(), null);
+ new BlockletDataMapDistributable(path.toString());
distributable.setLocations(location);
distributables.add(distributable);
@@ -232,10 +219,13 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
if (blockIndexes != null) {
for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
- DataMap dataMap = cache.getIfPresent(blockIndex);
- if (dataMap != null) {
+ BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex);
+ if (wrapper != null) {
+ List<DataMap> dataMaps = wrapper.getDataMaps();
cache.invalidate(blockIndex);
- dataMap.clear();
+ for (DataMap dataMap : dataMaps) {
+ dataMap.clear();
+ }
}
}
}
@@ -259,19 +249,17 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
identifiers
.add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo));
} else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString());
String parentPath = carbonFile.getParentFile().getAbsolutePath();
- List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath());
- for (String indexFile : indexFiles) {
- identifiers.add(
- new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(),
- segmentNo));
- }
+ identifiers.add(new TableBlockIndexUniqueIdentifier(parentPath, carbonFile.getName(),
+ carbonFile.getName(), segmentNo));
}
- List<DataMap> dataMaps;
+ List<DataMap> dataMaps = new ArrayList<>();
try {
- dataMaps = cache.getAll(identifiers);
+ List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = cache.getAll(identifiers);
+ for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) {
+ dataMaps.addAll(wrapper.getDataMaps());
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 9364a7a..5dcf885 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -65,9 +65,15 @@ public class SegmentIndexFileStore {
*/
private Map<String, byte[]> carbonIndexMapWithFullPath;
+ /**
+ * Stores the list of index files in a merge file
+ */
+ private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
+
public SegmentIndexFileStore() {
carbonIndexMap = new HashMap<>();
carbonIndexMapWithFullPath = new HashMap<>();
+ carbonMergeFileToIndexFilesMap = new HashMap<>();
}
/**
@@ -182,12 +188,8 @@ public class SegmentIndexFileStore {
Map<String, String> indexFiles = new HashMap<>();
for (int i = 0; i < carbonIndexFiles.length; i++) {
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- List<String> indexFilesFromMergeFile =
- getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath());
- for (String file: indexFilesFromMergeFile) {
- indexFiles.put(carbonIndexFiles[i].getParentFile().getAbsolutePath()
- + CarbonCommonConstants.FILE_SEPARATOR + file, carbonIndexFiles[i].getName());
- }
+ indexFiles
+ .put(carbonIndexFiles[i].getAbsolutePath(), carbonIndexFiles[i].getAbsolutePath());
} else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null);
}
@@ -216,13 +218,14 @@ public class SegmentIndexFileStore {
* @param mergeFilePath
* @throws IOException
*/
- private void readMergeFile(String mergeFilePath) throws IOException {
+ public void readMergeFile(String mergeFilePath) throws IOException {
ThriftReader thriftReader = new ThriftReader(mergeFilePath);
try {
thriftReader.open();
MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
List<String> file_names = indexHeader.getFile_names();
+ carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
assert (file_names.size() == fileData.size());
@@ -423,4 +426,8 @@ public class SegmentIndexFileStore {
+ " is " + (System.currentTimeMillis() - startTime));
return carbondataFileFooter.getBlockletList();
}
+
+ public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
+ return carbonMergeFileToIndexFilesMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index adb8715..8a07ea9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -40,6 +40,10 @@ public abstract class CarbonRowSchema implements Serializable {
return dataType;
}
+ public void setDataType(DataType dataType) {
+ this.dataType = dataType;
+ }
+
/**
* Gives length in case of fixed schema other wise returns length
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 82e6eab..e946a17 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -384,6 +384,35 @@ public class SegmentFileStore {
}
/**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexOrMergeFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ String mergeFileName = entry.getValue().getMergeFileName();
+ if (null != mergeFileName) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName,
+ entry.getValue().mergeFileName);
+ } else {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
* Gets all carbon index files from this segment
* @return
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index f77358f..8ecdba5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -24,7 +24,6 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -36,12 +35,7 @@ import org.apache.hadoop.io.Writable;
/**
* class to store the information about the blocklet
*/
-public class BlockletInfo implements Serializable, Writable {
-
- /**
- * serialization id
- */
- private static final long serialVersionUID = 1873135459695635381L;
+public class BlockletInfo implements Writable {
/**
* Number of rows in this blocklet
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
index 1f45716..0b9ce4b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
@@ -52,7 +52,7 @@ public class DataFileFooter implements Serializable {
/**
* Information about leaf nodes of all columns in this file
*/
- private List<BlockletInfo> blockletList;
+ private transient List<BlockletInfo> blockletList;
/**
* blocklet index of all blocklets in this file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 4deafd4..e515e35 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -80,6 +80,11 @@ public class TableInfo implements Serializable, Writable {
private List<RelationIdentifier> parentRelationIdentifiers;
+ /**
+ * flag to check whether any schema modification operation has happened after creation of table
+ */
+ private boolean isSchemaModified;
+
public TableInfo() {
dataMapSchemaList = new ArrayList<>();
}
@@ -97,6 +102,18 @@ public class TableInfo implements Serializable, Writable {
public void setFactTable(TableSchema factTable) {
this.factTable = factTable;
updateParentRelationIdentifier();
+ updateIsSchemaModified();
+ }
+
+ private void updateIsSchemaModified() {
+ if (null != factTable.getSchemaEvalution()) {
+ // If schema evolution entry list size is > 1 that means an alter operation is performed
+ // which has added the new schema entry in the schema evolution list.
+ // Currently apart from create table schema evolution entries
+ // are getting added only in the alter operations.
+ isSchemaModified =
+ factTable.getSchemaEvalution().getSchemaEvolutionEntryList().size() > 1 ? true : false;
+ }
}
private void updateParentRelationIdentifier() {
@@ -254,6 +271,7 @@ public class TableInfo implements Serializable, Writable {
parentRelationIdentifiers.get(i).write(out);
}
}
+ out.writeBoolean(isSchemaModified);
}
@Override public void readFields(DataInput in) throws IOException {
@@ -288,6 +306,7 @@ public class TableInfo implements Serializable, Writable {
this.parentRelationIdentifiers.add(relationIdentifier);
}
}
+ this.isSchemaModified = in.readBoolean();
}
public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
@@ -316,4 +335,8 @@ public class TableInfo implements Serializable, Writable {
return parentRelationIdentifiers;
}
+ public boolean isSchemaModified() {
+ return isSchemaModified;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 766650d..48bf179 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -18,11 +18,13 @@
package org.apache.carbondata.core.util;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
@@ -39,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
@@ -46,8 +49,10 @@ public class BlockletDataMapUtil {
public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
- Set<String> filesRead) throws IOException {
- if (identifier.getMergeIndexFileName() != null) {
+ Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
+ throws IOException {
+ if (identifier.getMergeIndexFileName() != null
+ && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
CarbonFile indexMergeFile = FileFactory.getCarbonFile(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getMergeIndexFileName());
@@ -68,23 +73,58 @@ public class BlockletDataMapUtil {
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
for (DataFileFooter footer : indexInfo) {
String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
- blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
+ if (null == blockMetaInfoMap.get(blockPath)) {
+ blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
+ }
}
return blockMetaInfoMap;
}
- private static BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
- CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
+ private static BlockMetaInfo createBlockMetaInfo(
+ Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String carbonDataFile)
+ throws IOException {
+ FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
+ switch (fileType) {
+ case LOCAL:
+ CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile, fileType);
+ return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
+ default:
+ return fileNameToMetaInfoMapping.get(carbonDataFile);
+ }
+ }
+
+ /**
+ * This method will create file name to block Meta Info Mapping. This method will reduce the
+ * number of namenode calls and using this method one namenode will fetch 1000 entries
+ *
+ * @param segmentFilePath
+ * @return
+ * @throws IOException
+ */
+ public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
+ String segmentFilePath) throws IOException {
+ Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
if (carbonFile instanceof AbstractDFSCarbonFile) {
+ Path path = new Path(segmentFilePath);
RemoteIterator<LocatedFileStatus> iter =
- ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
- LocatedFileStatus fileStatus = iter.next();
- String[] location = fileStatus.getBlockLocations()[0].getHosts();
- long len = fileStatus.getLen();
- return new BlockMetaInfo(location, len);
- } else {
- return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
+ ((AbstractDFSCarbonFile) carbonFile).fs.listLocatedStatus(path);
+ PathFilter pathFilter = new PathFilter() {
+ @Override public boolean accept(Path path) {
+ return CarbonTablePath.isCarbonDataFile(path.getName());
+ }
+ };
+ while (iter.hasNext()) {
+ LocatedFileStatus fileStatus = iter.next();
+ if (pathFilter.accept(fileStatus.getPath())) {
+ String[] location = fileStatus.getBlockLocations()[0].getHosts();
+ long len = fileStatus.getLen();
+ BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
+ fileNameToMetaInfoMapping.put(fileStatus.getPath().toString(), blockMetaInfo);
+ }
+ }
}
+ return fileNameToMetaInfoMapping;
}
public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment,
@@ -96,7 +136,7 @@ public class BlockletDataMapUtil {
indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
} else {
SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
- indexFiles = fileStore.getIndexFiles();
+ indexFiles = fileStore.getIndexOrMergeFiles();
}
for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
Path indexFile = new Path(indexFileEntry.getKey());
@@ -108,33 +148,51 @@ public class BlockletDataMapUtil {
}
/**
- * This method will filter out the TableBlockIndexUniqueIdentifiers belongs to that distributable
+ * This method will filter out the TableBlockIndexUniqueIdentifier belongs to that distributable
*
* @param tableBlockIndexUniqueIdentifiers
* @param distributable
* @return
*/
- public static Set<TableBlockIndexUniqueIdentifier> filterIdentifiersBasedOnDistributable(
+ public static TableBlockIndexUniqueIdentifier filterIdentifiersBasedOnDistributable(
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
BlockletDataMapDistributable distributable) {
- Set<TableBlockIndexUniqueIdentifier> validIdentifiers =
- new HashSet<>(tableBlockIndexUniqueIdentifiers.size());
- if (distributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
- tableBlockIndexUniqueIdentifiers) {
- if (null != tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
- validIdentifiers.add(tableBlockIndexUniqueIdentifier);
- }
- }
- } else {
- for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
- tableBlockIndexUniqueIdentifiers) {
- if (null == tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
- validIdentifiers.add(tableBlockIndexUniqueIdentifier);
- }
+ TableBlockIndexUniqueIdentifier validIdentifier = null;
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ if (distributable.getFilePath().equals(
+ tableBlockIndexUniqueIdentifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR
+ + tableBlockIndexUniqueIdentifier.getIndexFileName())) {
+ validIdentifier = tableBlockIndexUniqueIdentifier;
+ break;
}
}
- return validIdentifiers;
+ return validIdentifier;
+ }
+
+ /**
+ * This method will the index files tableBlockIndexUniqueIdentifiers of a merge index file
+ *
+ * @param identifier
+ * @return
+ * @throws IOException
+ */
+ public static List<TableBlockIndexUniqueIdentifier> getIndexFileIdentifiersFromMergeFile(
+ TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore segmentIndexFileStore)
+ throws IOException {
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+ String mergeFilePath =
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName();
+ segmentIndexFileStore.readMergeFile(mergeFilePath);
+ List<String> indexFiles =
+ segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
+ for (String indexFile : indexFiles) {
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(identifier.getIndexFilePath(), indexFile,
+ identifier.getIndexFileName(), identifier.getSegmentId()));
+ }
+ return tableBlockIndexUniqueIdentifiers;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index 67c50e5..67fe8fb 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -48,7 +49,7 @@ public class TestBlockletDataMapFactory {
private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
- private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+ private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
@Before public void setUp() {
blockletDataMapFactory = new BlockletDataMapFactory();
@@ -62,23 +63,24 @@ public class TestBlockletDataMapFactory {
@Test public void addDataMapToCache()
throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
- BlockletDataMap dataMap = new BlockletDataMap();
- dataMap.setTableBlockUniqueIdentifier(tableBlockIndexUniqueIdentifier);
- Method method =
- BlockletDataMapFactory.class.getDeclaredMethod("cache", DataMap.class);
+ List<DataMap> dataMaps = new ArrayList<>();
+ Method method = BlockletDataMapFactory.class
+ .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class,
+ BlockletDataMapIndexWrapper.class);
method.setAccessible(true);
- method.invoke(blockletDataMapFactory, dataMap);
- DataMap result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+ method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier,
+ new BlockletDataMapIndexWrapper(dataMaps));
+ BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
assert null != result;
}
@Test public void getValidDistributables() throws IOException {
BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
- "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex", null);
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
Segment segment = new Segment("0", null);
blockletDataMapDistributable.setSegment(segment);
BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
- "/opt/store/default/carbon_table/Fact/Part0/Segment_0/1521012756710.carbonindexmerge", null);
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");
blockletDataMapDistributable1.setSegment(segment);
List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2);
dataMapDistributables.add(blockletDataMapDistributable);
@@ -89,15 +91,10 @@ public class TestBlockletDataMapFactory {
TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 =
new TableBlockIndexUniqueIdentifier(
"/opt/store/default/carbon_table/Fact/Part0/Segment_0",
- "0_batchno0-0-1521012756701.carbonindex", "1521012756710.carbonindexmerge", "0");
- TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier2 =
- new TableBlockIndexUniqueIdentifier(
- "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
- "0_batchno0-0-1521012756702.carbonindex", "1521012756710.carbonindexmerge", "0");
+ "0_batchno0-0-1521012756701.carbonindex", null, "0");
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(3);
tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1);
- tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier2);
return tableBlockIndexUniqueIdentifiers;
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 9f67b22..32af8d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -16,9 +16,9 @@
*/
package org.apache.carbondata.hadoop;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,13 +40,14 @@ public class CacheClient {
private static final LogService LOGGER =
LogServiceFactory.getLogService(CacheClient.class.getName());
+ private final Object lock = new Object();
+
// segment access client for driver LRU cache
private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
segmentAccessClient;
private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>
- segmentProperties =
- new HashMap<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>();
+ segmentProperties = new ConcurrentHashMap<>();
public CacheClient() {
Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
@@ -78,13 +79,18 @@ public class CacheClient {
columnCardinality);
SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
if (null == segmentProperties) {
- // create a metadata details
- // this will be useful in query handling
- // all the data file metadata will have common segment properties we
- // can use first one to get create the segment properties
- LOGGER.info("Constructing new SegmentProperties");
- segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
- this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+ synchronized (lock) {
+ segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+ if (null == segmentProperties) {
+ // create a metadata details
+ // this will be useful in query handling
+ // all the data file metadata will have common segment properties we
+ // can use first one to get create the segment properties
+ LOGGER.info("Constructing new SegmentProperties");
+ segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
+ this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+ }
+ }
}
return segmentProperties;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
index 35d4b39..4b57d37 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.hadoop.api;
import java.util.List;
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -31,9 +31,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
*/
public abstract class AbstractDataMapJob implements DataMapJob {
- @Override
- public List<DataMap> execute(CarbonTable carbonTable, FileInputFormat<Void, DataMap> format) {
- return null;
+ @Override public void execute(CarbonTable carbonTable,
+ FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
}
@Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0b0d8e65/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index 3e32ecb..d41acec 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.hadoop.api;
import java.io.Serializable;
import java.util.List;
-import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -32,7 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
*/
public interface DataMapJob extends Serializable {
- List<DataMap> execute(CarbonTable carbonTable, FileInputFormat<Void, DataMap> dataMapFormat);
+ void execute(CarbonTable carbonTable,
+ FileInputFormat<Void, BlockletDataMapIndexWrapper> dataMapFormat);
List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
FilterResolverIntf resolverIntf);