You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/04/10 10:59:11 UTC
[2/2] carbondata git commit: [CARBONDATA-2310] Refactored code to
improve Distributable interface
[CARBONDATA-2310] Refactored code to improve Distributable interface
Refactored code to improve Distributable interface
This closes #2134
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c48df39
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c48df39
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c48df39
Branch: refs/heads/branch-1.3
Commit: 3c48df396f2bafc9efc8091fc7abefca089922d7
Parents: 31c7b50
Author: dhatchayani <dh...@gmail.com>
Authored: Tue Apr 3 11:19:43 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Apr 10 16:16:27 2018 +0530
----------------------------------------------------------------------
.../org/apache/carbondata/core/cache/Cache.java | 10 ++
.../dictionary/AbstractDictionaryCache.java | 4 +
.../core/constants/CarbonCommonConstants.java | 3 +
.../core/datamap/dev/CacheableDataMap.java | 47 ++++++
.../carbondata/core/datamap/dev/DataMap.java | 3 +-
.../core/datastore/BlockIndexStore.java | 4 +
.../core/datastore/SegmentTaskIndexStore.java | 4 +
.../core/indexstore/AbstractMemoryDMStore.java | 63 +++++++
.../indexstore/BlockletDataMapIndexStore.java | 92 ++++------
.../core/indexstore/SafeMemoryDMStore.java | 94 +++++++++++
.../TableBlockIndexUniqueIdentifier.java | 3 +-
.../core/indexstore/UnsafeMemoryDMStore.java | 23 +--
.../blockletindex/BlockletDataMap.java | 169 +++++++++++++------
.../BlockletDataMapDistributable.java | 18 +-
.../blockletindex/BlockletDataMapFactory.java | 95 +++++++----
.../blockletindex/BlockletDataMapModel.java | 13 ++
.../core/indexstore/row/DataMapRow.java | 13 +-
.../core/indexstore/row/UnsafeDataMapRow.java | 7 +-
.../core/indexstore/schema/CarbonRowSchema.java | 4 +-
.../core/util/BlockletDataMapUtil.java | 140 +++++++++++++++
.../carbondata/core/util/SessionParams.java | 5 +
.../TestBlockletDataMapFactory.java | 108 ++++++++++++
.../apache/carbondata/hadoop/CacheClient.java | 43 +++++
.../hadoop/api/AbstractDataMapJob.java | 43 +++++
.../hadoop/api/CarbonTableInputFormat.java | 61 ++++++-
.../carbondata/hadoop/api/DataMapJob.java | 6 +
.../hadoop/util/CarbonInputFormatUtil.java | 44 +++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 9 +-
.../carbondata/spark/rdd/SparkDataMapJob.scala | 4 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 13 ++
.../execution/command/CarbonHiveCommands.scala | 9 +
31 files changed, 973 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
index 04fa18a..6df36fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.core.cache;
import java.io.IOException;
import java.util.List;
+import org.apache.carbondata.core.memory.MemoryException;
+
/**
* A semi-persistent mapping from keys to values. Cache entries are manually added using
* #get(Key), #getAll(List<Keys>) , and are stored in the cache until
@@ -69,6 +71,14 @@ public interface Cache<K, V> {
void invalidate(K key);
/**
+ * This method will add the value to the cache for the given key
+ *
+ * @param key
+ * @param value
+ */
+ void put(K key, V value) throws IOException, MemoryException;
+
+ /**
* Access count of Cacheable entry will be decremented
*
* @param keys
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index 598d00e..9ed9007 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -59,6 +59,10 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
initThreadPoolSize();
}
+ @Override public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
/**
* This method will initialize the thread pool size to be used for creating the
* max number of threads for a job
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 605d711..7bdea4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1587,6 +1587,9 @@ public final class CarbonCommonConstants {
// default value is 2 days
public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT = "48";
+ // Property to enable parallel datamap loading for a table
+ public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel.";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
new file mode 100644
index 0000000..885b21f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * Interface for data map caching
+ */
+public interface CacheableDataMap {
+
+ /**
+ * Add the dataMap to cache
+ *
+ * @param dataMap
+ */
+ void cache(DataMap dataMap) throws IOException, MemoryException;
+
+ /**
+ * Get all the uncached distributables from the list.
+ *
+ * @param distributables
+ * @return
+ */
+ List<DataMapDistributable> getAllUncachedDistributables(List<DataMapDistributable> distributables)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index f3642d6..f096dd7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datamap.dev;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import org.apache.carbondata.core.indexstore.Blocklet;
@@ -27,7 +28,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
* Datamap is an entity which can store and retrieve index data.
*/
-public interface DataMap {
+public interface DataMap extends Serializable {
/**
* It is called to load the data map to memory or to initialize it.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
index f2c38fa..609bf1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
@@ -229,6 +229,10 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
.remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
}
+ @Override public void put(TableBlockUniqueIdentifier key, AbstractIndex value) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
@Override public void clearAccessCount(List<TableBlockUniqueIdentifier> keys) {
for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) {
SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 8ed5c18..744cc93 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -140,6 +140,10 @@ public class SegmentTaskIndexStore
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
+ @Override public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
/**
* returns block timestamp value from the given task
* @param taskKey
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
new file mode 100644
index 0000000..41b929a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+
+/**
+ * Store the data map row @{@link DataMapRow}
+ */
+public abstract class AbstractMemoryDMStore implements Serializable {
+
+ protected boolean isMemoryFreed;
+
+ protected CarbonRowSchema[] schema;
+
+ protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
+ public AbstractMemoryDMStore(CarbonRowSchema[] schema) {
+ this.schema = schema;
+ }
+
+ public abstract void addIndexRow(DataMapRow indexRow) throws MemoryException;
+
+ public abstract DataMapRow getDataMapRow(int index);
+
+ public abstract void freeMemory();
+
+ public abstract int getMemoryUsed();
+
+ public CarbonRowSchema[] getSchema() {
+ return schema;
+ }
+
+ public abstract int getRowCount();
+
+ public void finishWriting() throws MemoryException {
+ // do nothing in default implementation
+ }
+
+ public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+ throw new UnsupportedOperationException("Operation not allowed");
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index ac14105..ce0fe8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.indexstore;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -31,19 +30,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
/**
* Class to handle loading, unloading,clearing,storing of the table
@@ -86,7 +77,7 @@ public class BlockletDataMapIndexStore
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
Set<String> filesRead = new HashSet<>();
Map<String, BlockMetaInfo> blockMetaInfoMap =
- getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
+ BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
} catch (MemoryException e) {
LOGGER.error("memory exception when loading datamap: " + e.getMessage());
@@ -96,54 +87,6 @@ public class BlockletDataMapIndexStore
return dataMap;
}
- private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
- SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException {
- if (identifier.getMergeIndexFileName() != null) {
- CarbonFile indexMergeFile = FileFactory.getCarbonFile(
- identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getMergeIndexFileName());
- if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
- indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
- filesRead.add(indexMergeFile.getPath());
- }
- }
- if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
- indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
- identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getIndexFileName()) });
- }
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
- Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
- List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
- identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
- for (DataFileFooter footer : indexInfo) {
- String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
- if (FileFactory.isFileExist(blockPath)) {
- blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
- } else {
- LOGGER.warn("Skipping invalid block " + footer.getBlockInfo().getBlockUniqueName()
- + " The block does not exist. The block might be got deleted due to clean up post"
- + " update/delete operation over table.");
- }
- }
- return blockMetaInfoMap;
- }
-
- private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
- CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
- if (carbonFile instanceof AbstractDFSCarbonFile) {
- RemoteIterator<LocatedFileStatus> iter =
- ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
- LocatedFileStatus fileStatus = iter.next();
- String[] location = fileStatus.getBlockLocations()[0].getHosts();
- long len = fileStatus.getLen();
- return new BlockMetaInfo(location, len);
- } else {
- return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
- }
- }
-
@Override
public List<BlockletDataMap> getAll(
List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
@@ -165,7 +108,7 @@ public class BlockletDataMapIndexStore
Set<String> filesRead = new HashSet<>();
for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
Map<String, BlockMetaInfo> blockMetaInfoMap =
- getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
+ BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
blockletDataMaps.add(
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
}
@@ -206,6 +149,35 @@ public class BlockletDataMapIndexStore
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}
+ @Override public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+ BlockletDataMap blockletDataMap) throws IOException, MemoryException {
+ String uniqueTableSegmentIdentifier =
+ tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
+ Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+ if (lock == null) {
+ lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+ }
+ // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
+ // as in that case clearing unsafe memory need to be taken card. If at all datamap entry
+ // in the cache need to be overwritten then use the invalidate interface
+ // and then use the put interface
+ if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ synchronized (lock) {
+ if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ try {
+ blockletDataMap.convertToUnsafeDMStore();
+ lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(),
+ blockletDataMap, blockletDataMap.getMemorySize());
+ } catch (Throwable e) {
+ // clear all the memory acquired by data map in case of any failure
+ blockletDataMap.clear();
+ throw new IOException("Problem in adding datamap to cache.", e);
+ }
+ }
+ }
+ }
+ }
+
/**
* Below method will be used to load the segment of segments
* One segment may have multiple task , so table segment will be loaded
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
new file mode 100644
index 0000000..d51f28d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to memory.
+ */
+public class SafeMemoryDMStore extends AbstractMemoryDMStore {
+
+ /**
+ * holds all blocklets metadata in memory
+ */
+ private List<DataMapRow> dataMapRows =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ private int runningLength;
+
+ public SafeMemoryDMStore(CarbonRowSchema[] schema) {
+ super(schema);
+ }
+
+ /**
+ * Add the index row to dataMapRows, basically to in memory.
+ *
+ * @param indexRow
+ * @return
+ */
+ @Override
+ public void addIndexRow(DataMapRow indexRow) throws MemoryException {
+ dataMapRows.add(indexRow);
+ runningLength += indexRow.getTotalSizeInBytes();
+ }
+
+ @Override
+ public DataMapRow getDataMapRow(int index) {
+ assert (index < dataMapRows.size());
+ return dataMapRows.get(index);
+ }
+
+ @Override
+ public void freeMemory() {
+ if (!isMemoryFreed) {
+ if (null != dataMapRows) {
+ dataMapRows.clear();
+ dataMapRows = null;
+ }
+ isMemoryFreed = true;
+ }
+ }
+
+ @Override
+ public int getMemoryUsed() {
+ return runningLength;
+ }
+
+ @Override
+ public int getRowCount() {
+ return dataMapRows.size();
+ }
+
+ @Override
+ public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+ UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema);
+ for (DataMapRow dataMapRow : dataMapRows) {
+ unsafeMemoryDMStore.addIndexRow(dataMapRow);
+ }
+ unsafeMemoryDMStore.finishWriting();
+ return unsafeMemoryDMStore;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index c907fa8..8118fe4 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.indexstore;
+import java.io.Serializable;
import java.util.Objects;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -24,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
/**
* Class holds the indexFile information to uniquely identitify the carbon index
*/
-public class TableBlockIndexUniqueIdentifier {
+public class TableBlockIndexUniqueIdentifier implements Serializable {
private String indexFilePath;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 31ecac2..6fe7fd2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
@@ -32,9 +31,11 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
/**
* Store the data map row @{@link DataMapRow} data to unsafe.
*/
-public class UnsafeMemoryDMStore {
+public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
- private MemoryBlock memoryBlock;
+ private static final long serialVersionUID = -5344592407101055335L;
+
+ private transient MemoryBlock memoryBlock;
private static int capacity = 8 * 1024;
@@ -42,18 +43,12 @@ public class UnsafeMemoryDMStore {
private int runningLength;
- private boolean isMemoryFreed;
-
- private CarbonRowSchema[] schema;
-
private int[] pointers;
private int rowCount;
- private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-
public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException {
- this.schema = schema;
+ super(schema);
this.allocatedSize = capacity;
this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
this.pointers = new int[1000];
@@ -92,7 +87,7 @@ public class UnsafeMemoryDMStore {
* @param indexRow
* @return
*/
- public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
+ public void addIndexRow(DataMapRow indexRow) throws MemoryException {
// First calculate the required memory to keep the row in unsafe
int rowSize = indexRow.getTotalSizeInBytes();
// Check whether allocated memory is sufficient or not.
@@ -172,7 +167,7 @@ public class UnsafeMemoryDMStore {
}
}
- public UnsafeDataMapRow getUnsafeRow(int index) {
+ public DataMapRow getDataMapRow(int index) {
assert (index < rowCount);
return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
}
@@ -205,10 +200,6 @@ public class UnsafeMemoryDMStore {
return runningLength;
}
- public CarbonRowSchema[] getSchema() {
- return schema;
- }
-
public int getRowCount() {
return rowCount;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 9ec7a46..66fa0aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -39,11 +39,14 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -82,6 +85,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
public static final String NAME = "clustered.btree.blocklet";
+ private static final long serialVersionUID = 4121938766748899140L;
+
private static int KEY_INDEX = 0;
private static int MIN_VALUES_INDEX = 1;
@@ -120,14 +125,17 @@ public class BlockletDataMap implements DataMap, Cacheable {
private static int SEGMENTID = 5;
- private UnsafeMemoryDMStore unsafeMemoryDMStore;
+ private AbstractMemoryDMStore memoryDMStore;
- private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
+ private AbstractMemoryDMStore summaryDMStore;
- private SegmentProperties segmentProperties;
+ // As it is a heavy object it is not recommended to serialize this object
+ private transient SegmentProperties segmentProperties;
private int[] columnCardinality;
+ private TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier;
+
@Override
public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
long startTime = System.currentTimeMillis();
@@ -153,9 +161,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
schemaBinary = convertSchemaToBinary(columnInTable);
columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
- createSchema(segmentProperties);
+ createSchema(segmentProperties,
+ ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
- segmentId);
+ segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
}
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
@@ -186,20 +195,20 @@ public class BlockletDataMap implements DataMap, Cacheable {
}
}
}
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.finishWriting();
+ if (memoryDMStore != null) {
+ memoryDMStore.finishWriting();
}
- if (null != unsafeMemorySummaryDMStore) {
+ if (null != summaryDMStore) {
addTaskSummaryRowToUnsafeMemoryStore(
summaryRow,
schemaBinary,
filePath,
fileName,
segmentId);
- unsafeMemorySummaryDMStore.finishWriting();
+ summaryDMStore.finishWriting();
}
LOGGER.info(
- "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
+ "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is " + (
System.currentTimeMillis() - startTime));
}
@@ -208,10 +217,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
int[] minMaxLen = segmentProperties.getColumnsValueSize();
List<BlockletInfo> blockletList = fileFooter.getBlockletList();
- CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+ CarbonRowSchema[] schema = memoryDMStore.getSchema();
// Add one row to maintain task level min max for segment pruning
if (!blockletList.isEmpty() && summaryRow == null) {
- summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+ summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
}
for (int index = 0; index < blockletList.size(); index++) {
DataMapRow row = new DataMapRowImpl(schema);
@@ -227,7 +236,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
// compute and set task level min values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
TASK_MIN_VALUES_INDEX, true);
ordinal++;
taskMinMaxOrdinal++;
@@ -235,7 +244,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
// compute and set task level max values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
TASK_MAX_VALUES_INDEX, false);
ordinal++;
@@ -270,7 +279,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
row.setShort((short) relativeBlockletId++, ordinal++);
// Store block size
row.setLong(blockMetaInfo.getSize(), ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ memoryDMStore.addIndexRow(row);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -296,10 +305,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
BlockMetaInfo blockMetaInfo) {
int[] minMaxLen = segmentProperties.getColumnsValueSize();
BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
- CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+ CarbonRowSchema[] schema = memoryDMStore.getSchema();
// Add one row to maintain task level min max for segment pruning
if (summaryRow == null) {
- summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+ summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
}
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
@@ -318,14 +327,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
// compute and set task level min values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
TASK_MIN_VALUES_INDEX, true);
ordinal++;
taskMinMaxOrdinal++;
row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
// compute and set task level max values
addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
+ summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
TASK_MAX_VALUES_INDEX, false);
ordinal++;
@@ -358,7 +367,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
// store block size
row.setLong(blockMetaInfo.getSize(), ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ memoryDMStore.addIndexRow(row);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -379,7 +388,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
summaryRow.setByteArray(segmentId, SEGMENTID);
try {
- unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+ summaryDMStore.addIndexRow(summaryRow);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -517,7 +526,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
taskMinMaxRow.setRow(row, ordinal);
}
- private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
+ private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
+ throws MemoryException {
List<CarbonRowSchema> indexSchemas = new ArrayList<>();
// Index key
@@ -554,8 +564,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
// for storing block length.
indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
- unsafeMemoryDMStore =
- new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
+ CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+ memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
}
/**
@@ -566,7 +576,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
* @throws MemoryException
*/
private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
- byte[] filePath, byte[] fileName, byte[] segmentId)
+ byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
throws MemoryException {
List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
@@ -582,8 +592,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
// for storing segmentid
taskMinMaxSchemas.add(
new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
- unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
- taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+ CarbonRowSchema[] schema =
+ taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
+ summaryDMStore = getMemoryDMStore(schema, addToUnsafe);
}
private void getMinMaxSchema(SegmentProperties segmentProperties,
@@ -612,8 +623,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
public boolean isScanRequired(FilterResolverIntf filterExp) {
FilterExecuter filterExecuter =
FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
- for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
+ for (int i = 0; i < summaryDMStore.getRowCount(); i++) {
+ DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i);
boolean isScanRequired = FilterExpressionProcessor
.isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
@@ -626,25 +637,25 @@ public class BlockletDataMap implements DataMap, Cacheable {
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp) {
- if (unsafeMemoryDMStore.getRowCount() == 0) {
+ if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
List<Blocklet> blocklets = new ArrayList<>();
if (filterExp == null) {
- int rowCount = unsafeMemoryDMStore.getRowCount();
+ int rowCount = memoryDMStore.getRowCount();
for (int i = 0; i < rowCount; i++) {
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+ DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow();
blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
}
} else {
// Remove B-tree jump logic as start and end key prepared is not
// correct for old store scenarios
int startIndex = 0;
- int endIndex = unsafeMemoryDMStore.getRowCount();
+ int endIndex = memoryDMStore.getRowCount();
FilterExecuter filterExecuter =
FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
while (startIndex < endIndex) {
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+ DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow();
int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
@@ -663,7 +674,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
- if (unsafeMemoryDMStore.getRowCount() == 0) {
+ if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
// if it has partitioned datamap but there is no partitioned information stored, it means
@@ -736,7 +747,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
int index = Integer.parseInt(blockletId);
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+ DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow();
return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
}
@@ -787,7 +798,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
private String[] getFileDetails() {
try {
String[] fileDetails = new String[3];
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+ DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
fileDetails[0] =
new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
@@ -811,14 +822,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
int childNodeIndex;
int low = 0;
- int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int high = memoryDMStore.getRowCount() - 1;
int mid = 0;
int compareRes = -1;
//
while (low <= high) {
mid = (low + high) >>> 1;
// compare the entries
- compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
if (compareRes < 0) {
high = mid - 1;
} else if (compareRes > 0) {
@@ -827,7 +838,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
// if key is matched then get the first entry
int currentPos = mid;
while (currentPos - 1 >= 0
- && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+ && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) {
currentPos--;
}
mid = currentPos;
@@ -859,14 +870,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
int childNodeIndex;
int low = 0;
- int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int high = memoryDMStore.getRowCount() - 1;
int mid = 0;
int compareRes = -1;
//
while (low <= high) {
mid = (low + high) >>> 1;
// compare the entries
- compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
if (compareRes < 0) {
high = mid - 1;
} else if (compareRes > 0) {
@@ -874,8 +885,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
} else {
int currentPos = mid;
// if key is matched then get the first entry
- while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
- && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+ while (currentPos + 1 < memoryDMStore.getRowCount()
+ && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) {
currentPos++;
}
mid = currentPos;
@@ -903,13 +914,13 @@ public class BlockletDataMap implements DataMap, Cacheable {
buffer.putInt(key.getNoDictionaryKeys().length);
buffer.put(key.getDictionaryKeys());
buffer.put(key.getNoDictionaryKeys());
- DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+ DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema());
dataMapRow.setByteArray(buffer.array(), 0);
return dataMapRow;
}
private byte[] getColumnSchemaBinary() {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+ DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
return unsafeRow.getByteArray(SCHEMA);
}
@@ -933,15 +944,15 @@ public class BlockletDataMap implements DataMap, Cacheable {
@Override
public void clear() {
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.freeMemory();
- unsafeMemoryDMStore = null;
+ if (memoryDMStore != null) {
+ memoryDMStore.freeMemory();
+ memoryDMStore = null;
segmentProperties = null;
}
// clear task min/max unsafe memory
- if (null != unsafeMemorySummaryDMStore) {
- unsafeMemorySummaryDMStore.freeMemory();
- unsafeMemorySummaryDMStore = null;
+ if (null != summaryDMStore) {
+ summaryDMStore.freeMemory();
+ summaryDMStore = null;
}
}
@@ -958,13 +969,59 @@ public class BlockletDataMap implements DataMap, Cacheable {
@Override
public long getMemorySize() {
long memoryUsed = 0L;
- if (unsafeMemoryDMStore != null) {
- memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+ if (memoryDMStore != null) {
+ memoryUsed += memoryDMStore.getMemoryUsed();
}
- if (null != unsafeMemorySummaryDMStore) {
- memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
+ if (null != summaryDMStore) {
+ memoryUsed += summaryDMStore.getMemoryUsed();
}
return memoryUsed;
}
+ public TableBlockIndexUniqueIdentifier getTableBlockUniqueIdentifier() {
+ return tableBlockUniqueIdentifier;
+ }
+
+ public void setTableBlockUniqueIdentifier(
+ TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier) {
+ this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier;
+ }
+
+ public void setSegmentProperties(SegmentProperties segmentProperties) {
+ this.segmentProperties = segmentProperties;
+ }
+
+ public int[] getColumnCardinality() {
+ return columnCardinality;
+ }
+
+ private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
+ throws MemoryException {
+ AbstractMemoryDMStore memoryDMStore;
+ if (addToUnsafe) {
+ memoryDMStore = new UnsafeMemoryDMStore(schema);
+ } else {
+ memoryDMStore = new SafeMemoryDMStore(schema);
+ }
+ return memoryDMStore;
+ }
+
+ /**
+ * This method will ocnvert safe to unsafe memory DM store
+ *
+ * @throws MemoryException
+ */
+ public void convertToUnsafeDMStore() throws MemoryException {
+ if (memoryDMStore instanceof SafeMemoryDMStore) {
+ UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore();
+ memoryDMStore.freeMemory();
+ memoryDMStore = unsafeMemoryDMStore;
+ }
+ if (summaryDMStore instanceof SafeMemoryDMStore) {
+ UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore();
+ summaryDMStore.freeMemory();
+ summaryDMStore = unsafeSummaryMemoryDMStore;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 99e48a5..02ac8d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -16,7 +16,10 @@
*/
package org.apache.carbondata.core.indexstore.blockletindex;
+import java.util.Set;
+
import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
/**
* This class contains required information to make the Blocklet datamap distributable.
@@ -31,11 +34,24 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
*/
private String filePath;
- public BlockletDataMapDistributable(String indexFilePath) {
+ private Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers;
+
+ public BlockletDataMapDistributable(String indexFilePath,
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
this.filePath = indexFilePath;
+ this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
}
public String getFilePath() {
return filePath;
}
+
+ public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers() {
+ return tableBlockIndexUniqueIdentifiers;
+ }
+
+ public void setTableBlockIndexUniqueIdentifiers(
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers) {
+ this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 5eb077f..c08c87e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -21,13 +21,16 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -37,8 +40,10 @@ import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -50,12 +55,13 @@ import org.apache.hadoop.fs.RemoteIterator;
/**
* Table map for blocklet
*/
-public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher {
+public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher,
+ CacheableDataMap {
private AbsoluteTableIdentifier identifier;
// segmentId -> list of index file
- private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+ private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
@@ -73,33 +79,47 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
@Override
public List<DataMap> getDataMaps(Segment segment) throws IOException {
+ Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
- getTableBlockIndexUniqueIdentifiers(segment);
+ new ArrayList<>(identifiers.size());
+ tableBlockIndexUniqueIdentifiers.addAll(identifiers);
return cache.getAll(tableBlockIndexUniqueIdentifiers);
}
- private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+ @Override public void cache(DataMap dataMap) throws IOException, MemoryException {
+ BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap;
+ cache.put(blockletDataMap.getTableBlockUniqueIdentifier(), blockletDataMap);
+ }
+
+ @Override
+ public List<DataMapDistributable> getAllUncachedDistributables(
+ List<DataMapDistributable> distributables) throws IOException {
+ List<DataMapDistributable> distributablesToBeLoaded = new ArrayList<>(distributables.size());
+ for (DataMapDistributable distributable : distributables) {
+ Segment segment = distributable.getSegment();
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+ // filter out the tableBlockIndexUniqueIdentifiers based on distributable
+ Set<TableBlockIndexUniqueIdentifier> validIdentifiers = BlockletDataMapUtil
+ .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
+ (BlockletDataMapDistributable) distributable);
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : validIdentifiers) {
+ if (null == cache.getIfPresent(tableBlockIndexUniqueIdentifier)) {
+ distributablesToBeLoaded.add(distributable);
+ break;
+ }
+ }
+ }
+ return distributablesToBeLoaded;
+ }
+
+ private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
Segment segment) throws IOException {
- List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
if (tableBlockIndexUniqueIdentifiers == null) {
- tableBlockIndexUniqueIdentifiers = new ArrayList<>();
- Map<String, String> indexFiles;
- if (segment.getSegmentFileName() == null) {
- String path =
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
- indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
- } else {
- SegmentFileStore fileStore =
- new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
- indexFiles = fileStore.getIndexFiles();
- }
- for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
- Path indexFile = new Path(indexFileEntry.getKey());
- tableBlockIndexUniqueIdentifiers.add(
- new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
- indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
- }
+ tableBlockIndexUniqueIdentifiers =
+ BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment, identifier.getTablePath());
segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
}
return tableBlockIndexUniqueIdentifiers;
@@ -121,7 +141,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
}
return detailedBlocklets;
}
- List<TableBlockIndexUniqueIdentifier> identifiers =
+ Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
// Retrieve each blocklets detail information from blocklet datamap
for (Blocklet blocklet : blocklets) {
@@ -136,12 +156,12 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
if (blocklet instanceof ExtendedBlocklet) {
return (ExtendedBlocklet) blocklet;
}
- List<TableBlockIndexUniqueIdentifier> identifiers =
+ Set<TableBlockIndexUniqueIdentifier> identifiers =
getTableBlockIndexUniqueIdentifiers(segment);
return getExtendedBlocklet(identifiers, blocklet);
}
- private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
+ private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
Blocklet blocklet) throws IOException {
String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
@@ -156,6 +176,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
@Override
public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> distributables = new ArrayList<>();
+ Map<String, String> indexFiles = null;
try {
CarbonFile[] carbonIndexFiles;
if (segment.getSegmentFileName() == null) {
@@ -164,11 +185,20 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
} else {
SegmentFileStore fileStore =
new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
- Map<String, String> indexFiles = fileStore.getIndexFiles();
+ indexFiles = fileStore.getIndexFiles();
carbonIndexFiles = new CarbonFile[indexFiles.size()];
int i = 0;
- for (String indexFile : indexFiles.keySet()) {
- carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
+ for (Map.Entry<String, String> entry : indexFiles.entrySet()) {
+ String indexFile = entry.getKey();
+ String mergeFileName = entry.getValue();
+ if (null != mergeFileName) {
+ String mergeIndexPath = indexFile
+ .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1)
+ + mergeFileName;
+ carbonIndexFiles[i++] = FileFactory.getCarbonFile(mergeIndexPath);
+ } else {
+ carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
+ }
}
}
for (int i = 0; i < carbonIndexFiles.length; i++) {
@@ -179,7 +209,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
LocatedFileStatus fileStatus = iter.next();
String[] location = fileStatus.getBlockLocations()[0].getHosts();
BlockletDataMapDistributable distributable =
- new BlockletDataMapDistributable(path.toString());
+ new BlockletDataMapDistributable(path.toString(), null);
distributable.setLocations(location);
distributables.add(distributable);
@@ -197,7 +227,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
@Override
public void clear(Segment segment) {
- List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
+ Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
if (blockIndexes != null) {
for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
DataMap dataMap = cache.getIfPresent(blockIndex);
@@ -251,4 +281,9 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
// TODO: pass SORT_COLUMNS into this class
return null;
}
+
+ public Map<String, Set<TableBlockIndexUniqueIdentifier>> getSegmentMap() {
+ return segmentMap;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index ebeb278..17b2463 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -32,6 +32,8 @@ public class BlockletDataMapModel extends DataMapModel {
private String segmentId;
+ private boolean addToUnsafe = true;
+
public BlockletDataMapModel(String filePath, byte[] fileData,
Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
super(filePath);
@@ -40,6 +42,13 @@ public class BlockletDataMapModel extends DataMapModel {
this.segmentId = segmentId;
}
+ public BlockletDataMapModel(String filePath, byte[] fileData,
+ Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, boolean addToUnsafe) {
+ this(filePath, fileData, blockMetaInfoMap, segmentId);
+ this.addToUnsafe = addToUnsafe;
+ }
+
+
public byte[] getFileData() {
return fileData;
}
@@ -51,4 +60,8 @@ public class BlockletDataMapModel extends DataMapModel {
public String getSegmentId() {
return segmentId;
}
+
+ public boolean isAddToUnsafe() {
+ return addToUnsafe;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index b764bdf..496a1d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -16,13 +16,15 @@
*/
package org.apache.carbondata.core.indexstore.row;
+import java.io.Serializable;
+
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
/**
* It is just a normal row to store data. Implementation classes could be safe and unsafe.
* TODO move this class a global row and use across loading after DataType is changed class
*/
-public abstract class DataMapRow {
+public abstract class DataMapRow implements Serializable {
protected CarbonRowSchema[] schemas;
@@ -88,4 +90,13 @@ public abstract class DataMapRow {
public int getColumnCount() {
return schemas.length;
}
+
+ /**
+ * default implementation
+ *
+ * @return
+ */
+ public DataMapRow convertToSafeRow() {
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 1b95984..323fb24 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -30,7 +30,12 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
*/
public class UnsafeDataMapRow extends DataMapRow {
- private MemoryBlock block;
+ private static final long serialVersionUID = -8649767299407770884L;
+
+ // As it is an unsafe memory block it is not recommended to serialize.
+ // If at all required to be serialized then override writeObject methods
+ // to which should take care of clearing the unsafe memory post serialization
+ private transient MemoryBlock block;
private int pointer;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index 813be4a..adb8715 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -16,12 +16,14 @@
*/
package org.apache.carbondata.core.indexstore.schema;
+import java.io.Serializable;
+
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
* It just have 2 types right now, either fixed or variable.
*/
-public abstract class CarbonRowSchema {
+public abstract class CarbonRowSchema implements Serializable {
protected DataType dataType;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
new file mode 100644
index 0000000..766650d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+
+public class BlockletDataMapUtil {
+
+ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
+ TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
+ Set<String> filesRead) throws IOException {
+ if (identifier.getMergeIndexFileName() != null) {
+ CarbonFile indexMergeFile = FileFactory.getCarbonFile(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getMergeIndexFileName());
+ if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
+ indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+ filesRead.add(indexMergeFile.getPath());
+ }
+ }
+ if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+ indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName()) });
+ }
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
+ for (DataFileFooter footer : indexInfo) {
+ String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
+ blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
+ }
+ return blockMetaInfoMap;
+ }
+
+ private static BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
+ if (carbonFile instanceof AbstractDFSCarbonFile) {
+ RemoteIterator<LocatedFileStatus> iter =
+ ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
+ LocatedFileStatus fileStatus = iter.next();
+ String[] location = fileStatus.getBlockLocations()[0].getHosts();
+ long len = fileStatus.getLen();
+ return new BlockMetaInfo(location, len);
+ } else {
+ return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
+ }
+ }
+
+ public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment,
+ String tablePath) throws IOException {
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>();
+ Map<String, String> indexFiles;
+ if (segment.getSegmentFileName() == null) {
+ String path = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
+ indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+ } else {
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+ indexFiles = fileStore.getIndexFiles();
+ }
+ for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
+ Path indexFile = new Path(indexFileEntry.getKey());
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(),
+ indexFileEntry.getValue(), segment.getSegmentNo()));
+ }
+ return tableBlockIndexUniqueIdentifiers;
+ }
+
+ /**
+ * This method will filter out the TableBlockIndexUniqueIdentifiers belongs to that distributable
+ *
+ * @param tableBlockIndexUniqueIdentifiers
+ * @param distributable
+ * @return
+ */
+ public static Set<TableBlockIndexUniqueIdentifier> filterIdentifiersBasedOnDistributable(
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
+ BlockletDataMapDistributable distributable) {
+ Set<TableBlockIndexUniqueIdentifier> validIdentifiers =
+ new HashSet<>(tableBlockIndexUniqueIdentifiers.size());
+ if (distributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ if (null != tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
+ validIdentifiers.add(tableBlockIndexUniqueIdentifier);
+ }
+ }
+ } else {
+ for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+ tableBlockIndexUniqueIdentifiers) {
+ if (null == tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) {
+ validIdentifiers.add(tableBlockIndexUniqueIdentifier);
+ }
+ }
+ }
+ return validIdentifiers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index c232b1e..08b5929 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -203,6 +203,11 @@ public class SessionParams implements Serializable {
isValid = true;
} else if (key.equalsIgnoreCase(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)) {
isValid = true;
+ } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+ isValid = CarbonUtil.validateBoolean(value);
+ if (!isValid) {
+ throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
+ }
} else {
throw new InvalidConfigurationException(
"The key " + key + " not supported for dynamic configuration.");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
new file mode 100644
index 0000000..67c50e5
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletDataMapFactory {
+
+ private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+ private BlockletDataMapFactory blockletDataMapFactory;
+
+ private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
+ private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+
+ @Before public void setUp() {
+ blockletDataMapFactory = new BlockletDataMapFactory();
+ blockletDataMapFactory.init(absoluteTableIdentifier, "dataMapName");
+ tableBlockIndexUniqueIdentifier =
+ new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+ "0_batchno0-0-1521012756709.carbonindex", null, "0");
+ cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
+ }
+
+ @Test public void addDataMapToCache()
+ throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException,
+ IllegalAccessException {
+ BlockletDataMap dataMap = new BlockletDataMap();
+ dataMap.setTableBlockUniqueIdentifier(tableBlockIndexUniqueIdentifier);
+ Method method =
+ BlockletDataMapFactory.class.getDeclaredMethod("cache", DataMap.class);
+ method.setAccessible(true);
+ method.invoke(blockletDataMapFactory, dataMap);
+ DataMap result = cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+ assert null != result;
+ }
+
+ @Test public void getValidDistributables() throws IOException {
+ BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex", null);
+ Segment segment = new Segment("0", null);
+ blockletDataMapDistributable.setSegment(segment);
+ BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/1521012756710.carbonindexmerge", null);
+ blockletDataMapDistributable1.setSegment(segment);
+ List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2);
+ dataMapDistributables.add(blockletDataMapDistributable);
+ dataMapDistributables.add(blockletDataMapDistributable1);
+ new MockUp<BlockletDataMapFactory>() {
+ @Mock Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+ Segment segment) {
+ TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 =
+ new TableBlockIndexUniqueIdentifier(
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+ "0_batchno0-0-1521012756701.carbonindex", "1521012756710.carbonindexmerge", "0");
+ TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier2 =
+ new TableBlockIndexUniqueIdentifier(
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+ "0_batchno0-0-1521012756702.carbonindex", "1521012756710.carbonindexmerge", "0");
+ Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(3);
+ tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
+ tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1);
+ tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier2);
+ return tableBlockIndexUniqueIdentifiers;
+ }
+ };
+ List<DataMapDistributable> validDistributables =
+ blockletDataMapFactory.getAllUncachedDistributables(dataMapDistributables);
+ assert 1 == validDistributables.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 8be1e2e..9f67b22 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -16,21 +16,38 @@
*/
package org.apache.carbondata.hadoop;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
/**
* CacheClient : Holds all the Cache access clients for Btree, Dictionary
*/
public class CacheClient {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CacheClient.class.getName());
+
// segment access client for driver LRU cache
private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
segmentAccessClient;
+ private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>
+ segmentProperties =
+ new HashMap<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties>();
+
public CacheClient() {
Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE);
@@ -45,4 +62,30 @@ public class CacheClient {
public void close() {
segmentAccessClient.close();
}
+
+ /**
+ * Method to get the segment properties and avoid construction of new segment properties until
+ * the schema is not modified
+ *
+ * @param tableIdentifier
+ * @param columnsInTable
+ * @param columnCardinality
+ */
+ public SegmentProperties getSegmentProperties(AbsoluteTableIdentifier tableIdentifier,
+ List<ColumnSchema> columnsInTable, int[] columnCardinality) {
+ SegmentTaskIndexStore.SegmentPropertiesWrapper segmentPropertiesWrapper =
+ new SegmentTaskIndexStore.SegmentPropertiesWrapper(tableIdentifier, columnsInTable,
+ columnCardinality);
+ SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper);
+ if (null == segmentProperties) {
+ // create a metadata details
+ // this will be useful in query handling
+ // all the data file metadata will have common segment properties we
+ // can use first one to get create the segment properties
+ LOGGER.info("Constructing new SegmentProperties");
+ segmentProperties = new SegmentProperties(columnsInTable, columnCardinality);
+ this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties);
+ }
+ return segmentProperties;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
new file mode 100644
index 0000000..35d4b39
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * abstract class for data map job
+ */
+public abstract class AbstractDataMapJob implements DataMapJob {
+
+ @Override
+ public List<DataMap> execute(CarbonTable carbonTable, FileInputFormat<Void, DataMap> format) {
+ return null;
+ }
+
+ @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
+ FilterResolverIntf resolverIntf) {
+ return null;
+ }
+}